public void visit()

in src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java [1146:1445]


    public void visit(LOJoin loj) throws FrontendException {

        String scope = DEFAULT_SCOPE;

        // List of join predicates
        List<Operator> inputs = loj.getPlan().getPredecessors(loj);

        // mapping of inner join physical plans corresponding to inner physical operators.
        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();

        // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();

        // List of physical operator corresponding to join predicates.
        List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();

        // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
        List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();

        boolean[] innerFlags = loj.getInnerFlags();
        String alias = loj.getAlias();
        SourceLocation location = loj.getLocation();
        int parallel = loj.getRequestedParallelism();

        for (int i=0;i<inputs.size();i++) {
            Operator op = inputs.get(i);
            PhysicalOperator physOp = logToPhyMap.get(op);
            inp.add(physOp);
            List<LogicalExpressionPlan> plans =  (List<LogicalExpressionPlan>)loj.getJoinPlan(i);

            List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);

            ppLists.add(exprPlans);
            joinPlans.put(physOp, exprPlans);

            // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
            List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
            for(PhysicalPlan exprPlan : exprPlans)
                tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
            keyTypes.add(tupleKeyMemberTypes);
        }

        if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
            POSkewedJoin skj;
            try {
                skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
                                        parallel,inp, innerFlags);
                skj.addOriginalLocation(alias, location);
                skj.setJoinPlans(joinPlans);
            }
            catch (Exception e) {
                int errCode = 2015;
                String msg = "Skewed Join creation failed";
                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
            }
            skj.setResultType(DataType.TUPLE);

            for (int i=0; i < inputs.size(); i++) {
                Operator op = inputs.get(i);
                if (!innerFlags[i]) {
                    try {
                        LogicalSchema s = ((LogicalRelationalOperator)op).getSchema();
                        // if the schema cannot be determined
                        if (s == null) {
                            throw new FrontendException(loj, "Cannot determine skewed join schema", 2247);
                        }
                        skj.addSchema(Util.translateSchema(s));
                    } catch (FrontendException e) {
                        int errCode = 2015;
                        String msg = "Couldn't set the schema for outer join" ;
                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                    }
                } else {
                    // This will never be retrieved. It just guarantees that the index will be valid when
                    // MRCompiler is trying to read the schema
                    skj.addSchema(null);
                }
            }

            currentPlan.add(skj);

            for (Operator op : inputs) {
                try {
                    currentPlan.connect(logToPhyMap.get(op), skj);
                } catch (PlanException e) {
                    int errCode = 2015;
                    String msg = "Invalid physical operators in the physical plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                }
            }
            logToPhyMap.put(loj, skj);
        }
        else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
            Schema[] inputSchemas = new Schema[inputs.size()];
            Schema[] keySchemas = new Schema[inputs.size()];

            outer: for (int i = 0; i < inputs.size(); i++) {
                LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
                if (logicalSchema == null) {
                    continue;
                }
                Schema toGen = Schema.getPigSchema(new ResourceSchema(logicalSchema));
                // This registers the value piece
                SchemaTupleFrontend.registerToGenerateIfPossible(toGen, false, GenContext.FR_JOIN);
                inputSchemas[i] = toGen;

                Schema keyToGen = new Schema();
                for (Byte byt : keyTypes.get(i)) {
                    // We cannot generate any nested code because that information is thrown away
                    if (byt == null || DataType.isComplex(byt.byteValue())) {
                        continue outer;
                    }
                    keyToGen.add(new FieldSchema(null, byt));
                }

                SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
                keySchemas[i] = keyToGen;
            }

            int fragment = 0;
            POFRJoin pfrj;
            try {
                boolean isLeftOuter = false;
                // We dont check for bounds issue as we assume that a join
                // involves atleast two inputs
                isLeftOuter = !innerFlags[1];

                Tuple nullTuple = null;
                if( isLeftOuter ) {
                    try {
                        // We know that in a Left outer join its only a two way
                        // join, so we assume index of 1 for the right input
                        LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();

                        // We check if we have a schema before the join
                        if(inputSchema == null) {
                            int errCode = 1109;
                            String msg = "Input (" + ((LogicalRelationalOperator)inputs.get(1)).getAlias() + ") " +
                            "on which outer join is desired should have a valid schema";
                            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
                        }

                        // Using the schema we decide the number of columns/fields
                        // in the nullTuple
                        nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
                        for(int j = 0; j < inputSchema.size(); j++) {
                            nullTuple.set(j, null);
                        }

                    } catch( FrontendException e ) {
                        int errCode = 2104;
                        String msg = "Error while determining the schema of input";
                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                    }
                }

                pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
                                        parallel,
                                        inp,
                                        ppLists,
                                        keyTypes,
                                        null,
                                        fragment,
                                        isLeftOuter,
                                        nullTuple,
                                        inputSchemas,
                                        keySchemas);
                pfrj.addOriginalLocation(alias, location);
            } catch (ExecException e1) {
                int errCode = 2058;
                String msg = "Unable to set index on newly create POLocalRearrange.";
                throw new VisitorException(msg, errCode, PigException.BUG, e1);
            }
            pfrj.setResultType(DataType.TUPLE);
            currentPlan.add(pfrj);
            for (Operator op : inputs) {
                try {
                    currentPlan.connect(logToPhyMap.get(op), pfrj);
                } catch (PlanException e) {
                    int errCode = 2015;
                    String msg = "Invalid physical operators in the physical plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                }
            }
            logToPhyMap.put(loj, pfrj);
        } else if ( (loj.getJoinType() == LOJoin.JOINTYPE.MERGE || loj.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE)
                && (new MapSideMergeValidator().validateMapSideMerge(inputs,loj.getPlan()))) {

            PhysicalOperator smj;
            boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;

            if(usePOMergeJoin){
                // We register the merge join schema information for code generation
                LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
                Schema leftSchema = null;
                if (logicalSchema != null) {
                    leftSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
                }
                logicalSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
                Schema rightSchema = null;
                if (logicalSchema != null) {
                    rightSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
                }
                logicalSchema = loj.getSchema();
                Schema mergedSchema = null;
                if (logicalSchema != null) {
                    mergedSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
                }

                if (leftSchema != null) {
                    SchemaTupleFrontend.registerToGenerateIfPossible(leftSchema, false, GenContext.MERGE_JOIN);
                }
                if (rightSchema != null) {
                    SchemaTupleFrontend.registerToGenerateIfPossible(rightSchema, false, GenContext.MERGE_JOIN);
                }
                if (mergedSchema != null) {
                    SchemaTupleFrontend.registerToGenerateIfPossible(mergedSchema, false, GenContext.MERGE_JOIN);
                }

                // inner join on two sorted inputs. We have less restrictive
                // implementation here in a form of POMergeJoin which doesn't
                // require loaders to implement collectable interface.
                try {
                    smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
                                            parallel,
                                            inp,
                                            joinPlans,
                                            keyTypes,
                                            loj.getJoinType(),
                                            leftSchema,
                                            rightSchema,
                                            mergedSchema);
                }
                catch (PlanException e) {
                    int errCode = 2042;
                    String msg = "Merge Join creation failed";
                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                }
                logToPhyMap.put(loj, smj);
            } else {
                // in all other cases we fall back to POMergeCogroup + Flattening FEs
                smj = compileToMergeCogrp(loj, loj.getExpressionPlans());
            }

            smj.setResultType(DataType.TUPLE);
            currentPlan.add(smj);
            smj.addOriginalLocation(alias, location);
            for (Operator op : inputs) {
                try {
                    currentPlan.connect(logToPhyMap.get(op), smj);
                } catch (PlanException e) {
                    int errCode = 2015;
                    String msg = "Invalid physical operators in the physical plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                }
            }

            if(!usePOMergeJoin){
                // Now create and configure foreach which will flatten the output
                // of cogroup.
                POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
                currentPlan.add(fe);
                try {
                    currentPlan.connect(smj, fe);
                } catch (PlanException e) {
                    throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
                }
                logToPhyMap.put(loj, fe);
            }

            return;
        }
        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
            POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
            POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
            currentPlan.add(fe);
            try {
                currentPlan.connect(poPackage, fe);
            } catch (PlanException e) {
                throw new LogicalToPhysicalTranslatorException(e.getDetailedMessage(),
                        e.getErrorCode(),e.getErrorSource(),e);
            }
            logToPhyMap.put(loj, fe);
            if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
                if (innerFlags.length == 2) {
                    if (innerFlags[0] == false && innerFlags[1] == false) {
                        throw new LogicalToPhysicalTranslatorException(
                                "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
                                        ". Bloom join cannot be used with a FULL OUTER join.",
                                1109,
                                PigException.INPUT);
                    }
                }
                poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
            } else {
                poPackage.getPkgr().setPackageType(PackageType.JOIN);
            }
        }
        translateSoftLinks(loj);
    }