public IOperatorNodePushable createPushRuntime()

in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java [344:716]


        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
                        throws HyracksDataException {

            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
            final ITuplePairComparator nljComparatorProbe2Build = tuplePairComparatorFactoryProbe2Build
                    .createTuplePairComparator(ctx);
            final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe
                    .createTuplePairComparator(ctx);
            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
                    .createPredicateEvaluator());

            for (int i = 0; i < comparatorFactories.length; i++) {
                comparators[i] = comparatorFactories[i].createBinaryComparator();
            }

            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
            final ArrayTupleBuilder nullTupleBuild = isLeftOuter ?
                    new ArrayTupleBuilder(buildRd.getFieldCount()) :
                    null;
            if (isLeftOuter) {
                DataOutput out = nullTupleBuild.getDataOutput();
                for (int i = 0; i < nullWriterFactories1.length; i++) {
                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                    nullWriters1[i].writeNull(out);
                    nullTupleBuild.addFieldEndOffset();
                }
            }

            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                private BuildAndPartitionTaskState state;
                private IFrame rPartbuff = new VSizeFrame(ctx);

                private FrameTupleAppender nullResultAppender = null;
                private FrameTupleAccessor probeTupleAccessor;

                @Override
                public void open() throws HyracksDataException {
                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));

                    writer.open();
                    state.hybridHJ.initProbe();

                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
                    }
                }

                @Override
                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                    state.hybridHJ.probe(buffer, writer);
                }

                @Override
                public void fail() throws HyracksDataException {
                    writer.fail();
                }

                @Override
                public void close() throws HyracksDataException {
                    try {
                        state.hybridHJ.closeProbe(writer);

                        BitSet partitionStatus = state.hybridHJ.getPartitionStatus();

                        rPartbuff.reset();
                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {

                            RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                            RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);

                            if (bReader == null || pReader == null) {
                                if (isLeftOuter && pReader != null) {
                                    appendNullToProbeTuples(pReader);
                                }
                                continue;
                            }
                            int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
                            int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                            joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                        }
                    } finally {
                        writer.close();
                    }
                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                    }
                }

                //The buildSideReader should be always the original buildSideReader, so should the probeSideReader
                private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
                        int buildSizeInTuple, int probeSizeInTuple, int level)
                        throws HyracksDataException {
                    ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                            hashFunctionGeneratorFactories).createPartitioner(level);
                    ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                            hashFunctionGeneratorFactories).createPartitioner(level);

                    long buildPartSize = buildSideReader.getFileSize() / ctx.getInitialFrameSize();
                    long probePartSize = probeSideReader.getFileSize() / ctx.getInitialFrameSize();
                    int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);

                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.fine(
                                "\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
                                        + ") - (level " + level + ")"
                                        + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
                                        + " - MemForJoin "
                                        + (state.memForJoin)
                                        + "  - LeftOuter is " + isLeftOuter);
                    }

                    //Apply in-Mem HJ if possible
                    if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin)
                            || (probePartSize < state.memForJoin && !isLeftOuter))) {
                        int tabSize = -1;
                        if (!forceRR && (isLeftOuter || (buildPartSize
                                < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                        + level + "]");
                            }
                            tabSize = buildSizeInTuple;
                            if (tabSize == 0) {
                                throw new HyracksDataException(
                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                            }
                            //Build Side is smaller
                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc,
                                    probeHpc, buildSideReader, probeSideReader); // checked-confirmed
                        } else { //Case 1.2 - InMemHJ with Role Reversal
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine(
                                        "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
                                                + level + "]");
                            }
                            tabSize = probeSizeInTuple;
                            if (tabSize == 0) {
                                throw new HyracksDataException(
                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                            }
                            //Probe Side is smaller
                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc,
                                    buildHpc, probeSideReader, buildSideReader); // checked-confirmed
                        }
                    }
                    //Apply (Recursive) HHJ
                    else {
                        if (LOGGER.isLoggable(Level.FINE)) {
                            LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                        }
                        if (!forceRR && (isLeftOuter
                                || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                        + level + "]");
                            }
                            applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
                                    probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
                                    beforeMax);

                        } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine(
                                        "\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
                            }

                            applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
                                    buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
                                    beforeMax);

                        }
                    }
                }

                private void applyHybridHashJoin(int tableSize,
                        final String PROBE_REL, final String BUILD_REL,
                        final int[] probeKeys, final int[] buildKeys,
                        final RecordDescriptor probeRd, final RecordDescriptor buildRd,
                        final ITuplePartitionComputer probeHpc, final ITuplePartitionComputer buildHpc,
                        RunFileReader probeSideReader, RunFileReader buildSideReader,
                        final int level, final long beforeMax)
                        throws HyracksDataException {

                    boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                            && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;

                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";

                    OptimizedHybridHashJoin rHHj;
                    int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor,
                            nPartitions);
                    rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
                            probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
                            predEvaluator, isLeftOuter, nullWriterFactories1); //checked-confirmed

                    rHHj.setIsReversed(isReversed);
                    buildSideReader.open();
                    rHHj.initBuild();
                    rPartbuff.reset();
                    while (buildSideReader.nextFrame(rPartbuff)) {
                        rHHj.build(rPartbuff.getBuffer());
                    }
                    rHHj.closeBuild();
                    buildSideReader.close();

                    probeSideReader.open();
                    rHHj.initProbe();
                    rPartbuff.reset();
                    while (probeSideReader.nextFrame(rPartbuff)) {
                        rHHj.probe(rPartbuff.getBuffer(), writer);
                    }
                    rHHj.closeProbe(writer);
                    probeSideReader.close();

                    int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
                    int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
                    int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);

                    BitSet rPStatus = rHHj.getPartitionStatus();
                    if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
                            * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                        if (LOGGER.isLoggable(Level.FINE)) {
                            LOGGER.fine(
                                    "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                            + level + "]");
                        }
                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
                            int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
                            int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);

                            if (rbrfw == null || rprfw == null) {
                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
                                    appendNullToProbeTuples(rprfw);
                                }
                                continue;
                            }

                            if (isReversed) {
                                joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
                            } else {
                                joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
                            }
                        }

                    } else { //Case 2.1.2 - Switch to NLJ
                        if (LOGGER.isLoggable(Level.FINE)) {
                            LOGGER.fine(
                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                            + level + "]");
                        }
                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);

                            if (rbrfw == null || rprfw == null) {
                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
                                    appendNullToProbeTuples(rprfw);
                                }
                                continue;
                            }

                            int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                            int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                            // NLJ order is outer + inner, the order is reversed from the other joins
                            if (isLeftOuter || probeSideInTups < buildSideInTups) {
                                applyNestedLoopJoin(probeRd, buildRd, frameLimit, rprfw, rbrfw); //checked-modified
                            } else {
                                applyNestedLoopJoin(buildRd, probeRd, frameLimit, rbrfw, rprfw); //checked-modified
                            }
                        }
                    }
                }

                private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
                    if (nullResultAppender == null) {
                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
                    }
                    if (probeTupleAccessor == null) {
                        probeTupleAccessor = new FrameTupleAccessor(probeRd);
                    }
                    probReader.open();
                    while (probReader.nextFrame(rPartbuff)) {
                        probeTupleAccessor.reset(rPartbuff.getBuffer());
                        for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
                            FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
                                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
                                    nullTupleBuild.getSize());
                        }
                    }
                    probReader.close();
                    nullResultAppender.write(writer, true);
                }

                private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                        RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
                        ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader)
                        throws HyracksDataException {
                    boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                            && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;

                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";

                    ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild,
                            new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table,
                            predEvaluator, isReversed);

                    bReader.open();
                    rPartbuff.reset();
                    while (bReader.nextFrame(rPartbuff)) {
                        //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize());
                        FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                        joiner.build(copyBuffer);
                        rPartbuff.reset();
                    }
                    bReader.close();
                    rPartbuff.reset();
                    //probe
                    pReader.open();
                    while (pReader.nextFrame(rPartbuff)) {
                        joiner.join(rPartbuff.getBuffer(), writer);
                        rPartbuff.reset();
                    }
                    pReader.close();
                    joiner.closeJoin(writer);
                }

                private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                        RunFileReader outerReader, RunFileReader innerReader)
                        throws HyracksDataException {
                    // The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different
                    boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                    ITuplePairComparator nljComptorOuterInner = isReversed ?
                            nljComparatorBuild2Probe :
                            nljComparatorProbe2Build;
                    NestedLoopJoin nlj = new NestedLoopJoin(ctx,
                            new FrameTupleAccessor(outerRd),
                            new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize,
                            predEvaluator, isLeftOuter, nullWriters1);
                    nlj.setIsReversed(isReversed);

                    IFrame cacheBuff = new VSizeFrame(ctx);
                    innerReader.open();
                    while (innerReader.nextFrame(cacheBuff)) {
                        nlj.cache(cacheBuff.getBuffer());
                        cacheBuff.reset();
                    }
                    nlj.closeCache();

                    IFrame joinBuff = new VSizeFrame(ctx);
                    outerReader.open();

                    while (outerReader.nextFrame(joinBuff)) {
                        nlj.join(joinBuff.getBuffer(), writer);
                        joinBuff.reset();
                    }

                    nlj.closeJoin(writer);
                    outerReader.close();
                    innerReader.close();
                }
            };
            return op;
        }