public IOperatorNodePushable createPushRuntime()

in hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java [367:859]


        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
                throws HyracksDataException {

            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
            final ITuplePairComparator probComp = tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
            final ITuplePairComparator buildComp = tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);

            final IMissingWriter[] nonMatchWriter =
                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
            final ArrayTupleBuilder nullTupleBuild =
                    isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount()) : null;
            if (isLeftOuter) {
                DataOutput out = nullTupleBuild.getDataOutput();
                for (int i = 0; i < nonMatchWriterFactories.length; i++) {
                    nonMatchWriter[i] = nonMatchWriterFactories[i].createMissingWriter();
                    nonMatchWriter[i].writeMissing(out);
                    nullTupleBuild.addFieldEndOffset();
                }
            }

            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
                private BuildAndPartitionTaskState state;
                private IFrame rPartbuff = new VSizeFrame(jobletCtx);

                private FrameTupleAppender nullResultAppender = null;
                private FrameTupleAccessor probeTupleAccessor;
                private boolean failed = false;
                IOperatorStats stats = null;

                @Override
                public void open() throws HyracksDataException {
                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));

                    writer.open();
                    state.hybridHJ.initProbe(probComp);
                    state.hybridHJ.setOperatorStats(stats);
                }

                @Override
                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                    state.hybridHJ.probe(buffer, writer);
                }

                @Override
                public void fail() throws HyracksDataException {
                    failed = true;
                    if (state != null && state.hybridHJ != null) {
                        state.hybridHJ.fail();
                    }
                    writer.fail();
                }

                @Override
                public void close() throws HyracksDataException {
                    if (failed) {
                        try {
                            // Clear temp files if fail() was called.
                            if (state != null && state.hybridHJ != null) {
                                state.hybridHJ.clearBuildTempFiles();
                                state.hybridHJ.clearProbeTempFiles();
                            }
                        } finally {
                            writer.close(); // writer should always be closed.
                        }
                        return;
                    }
                    Throwable ex = null;
                    try {
                        try {
                            state.hybridHJ.completeProbe(writer);
                        } finally {
                            state.hybridHJ.releaseResource();
                        }
                        BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
                        rPartbuff.reset();
                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
                                partitionStatus.nextSetBit(pid + 1)) {
                            RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                            RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);

                            if (bReader == null || pReader == null) {
                                if (isLeftOuter && pReader != null) {
                                    appendNullToProbeTuples(pReader);
                                }
                                if (bReader != null) {
                                    bReader.close();
                                }
                                if (pReader != null) {
                                    pReader.close();
                                }
                                continue;
                            }
                            int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
                            int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                            joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                        }
                    } catch (Exception e) {
                        ex = e;
                        if (state.hybridHJ != null) {
                            state.hybridHJ.fail();
                        }
                        // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
                        // to send the failure signal to the downstream, when there is a throwable thrown.
                        CleanupUtils.fail(writer, ex);
                        // Clear temp files as this.fail() nor this.close() will no longer be called after close().
                        state.hybridHJ.clearBuildTempFiles();
                        state.hybridHJ.clearProbeTempFiles();
                    } finally {
                        ex = CleanupUtils.close(writer, ex);
                    }
                    if (ex != null) {
                        throw HyracksDataException.create(ex);
                    }
                }

                @Override
                public void setOperatorStats(IOperatorStats stats) {
                    this.stats = stats;
                }

                //The buildSideReader should be always the original buildSideReader, so should the probeSideReader
                private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
                        int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException {
                    ITuplePartitionComputer probeHpc =
                            new FieldHashPartitionComputerFamily(probeKeys, propHashFunctionFactories)
                                    .createPartitioner(level);
                    ITuplePartitionComputer buildHpc =
                            new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
                                    .createPartitioner(level);

                    int frameSize = jobletCtx.getInitialFrameSize();
                    long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
                    long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
                    int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
                    if (stats.getLevel().get() < level) {
                        stats.getLevel().set(level);
                    }

                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace(
                                "\n>>>Joining Partition Pairs (thread_id {}) (pid ) - (level {}) - BuildSize:\t{}\tProbeSize:\t{} - MemForJoin {}  - LeftOuter is {}",
                                Thread.currentThread().getId(), level, buildPartSize, probePartSize, state.memForJoin,
                                isLeftOuter);
                    }

                    // Calculate the expected hash table size for the both side.
                    long expectedHashTableSizeForBuildInFrame =
                            SerializableHashTable.getExpectedTableFrameCount(buildSizeInTuple, frameSize);
                    long expectedHashTableSizeForProbeInFrame =
                            SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);

                    //Apply in-Mem HJ if possible
                    if (((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
                            || (probePartSize + expectedHashTableSizeForProbeInFrame < state.memForJoin
                                    && !isLeftOuter))) {

                        int tabSize = -1;
                        if ((isLeftOuter || (buildPartSize < probePartSize))) {
                            //Case 1.1 - InMemHJ (without Role-Reversal)
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace(
                                        "\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level {}]",
                                        level);
                            }
                            tabSize = buildSizeInTuple;
                            if (tabSize == 0) {
                                throw new HyracksDataException(
                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                            }
                            //Build Side is smaller
                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc,
                                    buildSideReader, probeSideReader, probComp); // checked-confirmed
                        } else { //Case 1.2 - InMemHJ with Role Reversal
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace(
                                        "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJWITH RoleReversal - [Level {}]",
                                        level);
                            }
                            tabSize = probeSizeInTuple;
                            if (tabSize == 0) {
                                throw new HyracksDataException(
                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                            }
                            //Probe Side is smaller
                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
                                    probeSideReader, buildSideReader, buildComp); // checked-confirmed
                        }
                    }
                    //Apply (Recursive) HHJ
                    else {
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("\t>>>Case 2. ApplyRecursiveHHJ - [Level {}]", level);
                        }
                        if ((isLeftOuter || buildPartSize < probePartSize)) {
                            //Case 2.1 - Recursive HHJ (without Role-Reversal)
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace(
                                        "\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
                                        level);
                            }
                            applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
                                    probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
                                    beforeMax, probComp);

                        } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level {}]", level);
                            }

                            applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
                                    buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
                                    beforeMax, buildComp);

                        }
                    }
                }

                private void applyHybridHashJoin(int tableSize, final String PROBE_REL, final String BUILD_REL,
                        final int[] probeKeys, final int[] buildKeys, final RecordDescriptor probeRd,
                        final RecordDescriptor buildRd, final ITuplePartitionComputer probeHpc,
                        final ITuplePartitionComputer buildHpc, RunFileReader probeSideReader,
                        RunFileReader buildSideReader, final int level, final long beforeMax, ITuplePairComparator comp)
                        throws HyracksDataException {

                    boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                            && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                    OptimizedHybridHashJoin rHHj;
                    int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
                    rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
                            buildRd, probeHpc, buildHpc, null, null, isLeftOuter, nonMatchWriterFactories); //checked-confirmed

                    rHHj.setIsReversed(isReversed);
                    try {
                        buildSideReader.open();
                        try {
                            rHHj.initBuild();
                            rPartbuff.reset();
                            while (buildSideReader.nextFrame(rPartbuff)) {
                                rHHj.build(rPartbuff.getBuffer());
                            }
                        } finally {
                            // Makes sure that files are always properly closed.
                            rHHj.closeBuild();
                        }
                    } finally {
                        buildSideReader.close();
                    }
                    try {
                        probeSideReader.open();
                        rPartbuff.reset();
                        try {
                            rHHj.initProbe(comp);
                            while (probeSideReader.nextFrame(rPartbuff)) {
                                rHHj.probe(rPartbuff.getBuffer(), writer);
                            }
                            rHHj.completeProbe(writer);
                        } finally {
                            rHHj.releaseResource();
                        }
                    } finally {
                        // Makes sure that files are always properly closed.
                        probeSideReader.close();
                    }

                    try {
                        int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
                        int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
                        int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);

                        BitSet rPStatus = rHHj.getPartitionStatus();
                        if ((afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
                            //Case 2.1.1 - Keep applying HHJ
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace(
                                        "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
                                        level);
                            }
                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
                                int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
                                int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);

                                if (rbrfw == null || rprfw == null) {
                                    if (isLeftOuter && rprfw != null) {
                                        // For the outer join, we don't reverse the role.
                                        appendNullToProbeTuples(rprfw);
                                    }
                                    if (rbrfw != null) {
                                        rbrfw.close();
                                    }
                                    if (rprfw != null) {
                                        rprfw.close();
                                    }
                                    continue;
                                }

                                if (isReversed) {
                                    joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
                                } else {
                                    joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
                                }
                            }

                        } else { //Case 2.1.2 - Switch to NLJ
                            if (LOGGER.isTraceEnabled()) {
                                LOGGER.trace(
                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
                                        level);
                            }
                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);

                                if (rbrfw == null || rprfw == null) {
                                    if (isLeftOuter && rprfw != null) {
                                        // For the outer join, we don't reverse the role.
                                        appendNullToProbeTuples(rprfw);
                                    }
                                    if (rbrfw != null) {
                                        rbrfw.close();
                                    }
                                    if (rprfw != null) {
                                        rprfw.close();
                                    }
                                    continue;
                                }

                                int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                                // NLJ order is outer + inner, the order is reversed from the other joins
                                if (isLeftOuter || probeSideInTups < buildSideInTups) {
                                    //checked-modified
                                    applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw);
                                } else {
                                    //checked-modified
                                    applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw);
                                }
                            }
                        }
                    } catch (Exception e) {
                        // Make sure that temporary run files generated in recursive hybrid hash joins
                        // are closed and deleted.
                        rHHj.clearBuildTempFiles();
                        rHHj.clearProbeTempFiles();
                        throw e;
                    }
                }

                private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
                    if (nullResultAppender == null) {
                        nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
                    }
                    if (probeTupleAccessor == null) {
                        probeTupleAccessor = new FrameTupleAccessor(probeRd);
                    }
                    try {
                        probReader.open();
                        while (probReader.nextFrame(rPartbuff)) {
                            probeTupleAccessor.reset(rPartbuff.getBuffer());
                            for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
                                FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
                                        nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
                                        nullTupleBuild.getSize());
                            }
                        }
                        nullResultAppender.write(writer, true);
                    } finally {
                        probReader.close();
                    }
                }

                private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                        RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
                        ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader,
                        ITuplePairComparator comp) throws HyracksDataException {
                    boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                            && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                    IDeallocatableFramePool framePool =
                            new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
                    ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);

                    ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
                    InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
                            nonMatchWriter, table, isReversed, bufferManager);
                    joiner.setComparator(comp);
                    try {
                        bReader.open();
                        rPartbuff.reset();
                        while (bReader.nextFrame(rPartbuff)) {
                            stats.getBytesRead().update(rPartbuff.getBuffer().limit());
                            // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
                            // in the InMemoryHashJoin.
                            ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
                            // If a frame cannot be allocated, there may be a chance if we can compact the table,
                            // one or more frame may be reclaimed.
                            if (copyBuffer == null) {
                                if (joiner.compactHashTable() > 0) {
                                    copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
                                }
                                if (copyBuffer == null) {
                                    // Still no frame is allocated? At this point, we have no way to get a frame.
                                    throw new HyracksDataException(
                                            "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
                                }
                            }
                            FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                            joiner.build(copyBuffer);
                            rPartbuff.reset();
                        }
                    } finally {
                        bReader.close();
                    }
                    try {
                        //probe
                        pReader.open();
                        rPartbuff.reset();
                        try {
                            while (pReader.nextFrame(rPartbuff)) {
                                if (stats != null) {
                                    stats.getBytesRead().update(rPartbuff.getBuffer().limit());
                                }
                                joiner.join(rPartbuff.getBuffer(), writer);
                                rPartbuff.reset();
                            }
                            joiner.completeJoin(writer);
                        } finally {
                            joiner.releaseMemory();
                        }
                    } finally {
                        try {
                            pReader.close();
                        } finally {
                            joiner.closeTable();
                        }
                    }
                }

                private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                        RunFileReader outerReader, RunFileReader innerReader) throws HyracksDataException {
                    // The nested loop join result is outer + inner. All the other operator is probe + build.
                    // Hence the reverse relation is different.
                    boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                    ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
                    NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
                            new FrameTupleAccessor(innerRd), memorySize, isLeftOuter, nonMatchWriter, isReversed);
                    nlj.setComparator(nljComptorOuterInner);

                    IFrame cacheBuff = new VSizeFrame(jobletCtx);
                    try {
                        innerReader.open();
                        while (innerReader.nextFrame(cacheBuff)) {
                            nlj.cache(cacheBuff.getBuffer());
                            cacheBuff.reset();
                        }
                    } finally {
                        try {
                            nlj.closeCache();
                        } finally {
                            innerReader.close();
                        }
                    }
                    try {
                        IFrame joinBuff = new VSizeFrame(jobletCtx);
                        outerReader.open();
                        try {
                            while (outerReader.nextFrame(joinBuff)) {
                                nlj.join(joinBuff.getBuffer(), writer);
                                joinBuff.reset();
                            }
                            nlj.completeJoin(writer);
                        } finally {
                            nlj.releaseMemory();
                        }
                    } finally {
                        outerReader.close();
                    }
                }

                @Override
                public String getDisplayName() {
                    return "Hybrid Hash Join: Probe & Join";
                }
            };
            return op;
        }