private void processResults()

in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java [152:262]


    private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
        if (result.isEmpty())
            return;
        Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
        boolean projected = false;

        // For backward compatibility. In new versions, HashJoinInfo.forceProjection()
        // always returns true.
        if (joinInfo.forceProjection()) {
            tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
            projected = true;
        }

        // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well.
        if (hasBatchLimit)
            throw new UnsupportedOperationException("Cannot support join operations in scans with limit");

        int count = joinInfo.getJoinIds().length;
        boolean cont = true;
        for (int i = 0; i < count; i++) {
            if (!(joinInfo.earlyEvaluation()[i]) || hashCaches[i] == null)
                continue;
            ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(tuple, joinInfo.getJoinExpressions()[i]);
            tempTuples[i] = hashCaches[i].get(key);
            JoinType type = joinInfo.getJoinTypes()[i];
            if (((type == JoinType.Inner || type == JoinType.Semi) && tempTuples[i] == null)
                    || (type == JoinType.Anti && tempTuples[i] != null)) {
                cont = false;
                break;
            }
        }
        if (cont) {
            if (projector == null) {
                int dup = 1;
                for (int i = 0; i < count; i++) {
                    dup *= (tempTuples[i] == null ? 1 : tempTuples[i].size());
                }
                for (int i = 0; i < dup; i++) {
                    offerResult(tuple, projected, result);
                }
            } else {
                KeyValueSchema schema = joinInfo.getJoinedSchema();
                if (!joinInfo.forceProjection()) { // backward compatibility
                    tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
                    projected = true;
                }
                offerResult(tuple, projected, result);
                for (int i = 0; i < count; i++) {
                    boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
                    JoinType type = joinInfo.getJoinTypes()[i];
                    if (earlyEvaluation && (type == JoinType.Semi || type == JoinType.Anti))
                        continue;
                    int j = resultQueue.size();
                    while (j-- > 0) {
                        Tuple lhs = resultQueue.poll();
                        if (!earlyEvaluation) {
                            ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]);
                            tempTuples[i] = hashCaches[i].get(key);
                            if (tempTuples[i] == null) {
                                if (type == JoinType.Inner || type == JoinType.Semi) {
                                    continue;
                                } else if (type == JoinType.Anti) {
                                    offerResult(lhs, projected, result);
                                    continue;
                                }
                            }
                        }
                        if (tempTuples[i] == null) {
                            Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                    lhs : mergeProjectedValue(
                                            lhs, schema, tempDestBitSet, null,
                                            joinInfo.getSchemas()[i], tempSrcBitSet[i],
                                            joinInfo.getFieldPositions()[i]);
                            offerResult(joined, projected, result);
                            continue;
                        }
                        for (Tuple t : tempTuples[i]) {
                            Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                    lhs : mergeProjectedValue(
                                            lhs, schema, tempDestBitSet, t,
                                            joinInfo.getSchemas()[i], tempSrcBitSet[i],
                                            joinInfo.getFieldPositions()[i]);
                            offerResult(joined, projected, result);
                        }
                    }
                }
            }
            // apply post-join filter
            Expression postFilter = joinInfo.getPostJoinFilterExpression();
            if (postFilter != null) {
                for (Iterator<Tuple> iter = resultQueue.iterator(); iter.hasNext();) {
                    Tuple t = iter.next();
                    postFilter.reset();
                    ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
                    try {
                        if (!postFilter.evaluate(t, tempPtr) || tempPtr.getLength() == 0) {
                            iter.remove();
                            continue;
                        }
                    } catch (IllegalDataException e) {
                        iter.remove();
                        continue;
                    }
                    Boolean b = (Boolean)postFilter.getDataType().toObject(tempPtr);
                    if (!Boolean.TRUE.equals(b)) {
                        iter.remove();
                    }
                }
            }
        }
    }