public void visit()

in src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java [563:712]


    public void visit(LOCross cross) throws FrontendException {
        String scope = DEFAULT_SCOPE;
        List<Operator> inputs = cross.getPlan().getPredecessors(cross);
                if (cross.isNested()) {
            POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
            physOp.addOriginalLocation(physOp.getAlias(), physOp.getOriginalLocations());
            currentPlan.add(physOp);
            physOp.setResultType(DataType.BAG);
            logToPhyMap.put(cross, physOp);
            for (Operator op : cross.getPlan().getPredecessors(cross)) {
                PhysicalOperator from = logToPhyMap.get(op);
                try {
                    currentPlan.connect(from, physOp);
                } catch (PlanException e) {
                    int errCode = 2015;
                    String msg = "Invalid physical operators in the physical plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                }
            }
        } else {
            POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                    scope, nodeGen.getNextNodeId(scope)), cross
                    .getRequestedParallelism());
            poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
            POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                    .getNextNodeId(scope)), cross.getRequestedParallelism());
            poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
            poGlobal.setCross(true);
            currentPlan.add(poGlobal);
            currentPlan.add(poPackage);

            int count = 0;

            try {
                currentPlan.connect(poGlobal, poPackage);
                List<Boolean> flattenLst = Arrays.asList(true, true);

                for (Operator op : inputs) {
                    PhysicalPlan fep1 = new PhysicalPlan();
                    ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                    ce1.setValue(inputs.size());
                    ce1.setResultType(DataType.INTEGER);
                    fep1.add(ce1);

                    ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                    ce2.setValue(count);
                    ce2.setResultType(DataType.INTEGER);
                    fep1.add(ce2);
                    /*Tuple ce1val = TupleFactory.getInstance().newTuple(2);
                    ce1val.set(0,inputs.size());
                    ce1val.set(1,count);
                    ce1.setValue(ce1val);
                    ce1.setResultType(DataType.TUPLE);*/

                    POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(),
                            Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()
                            + "('" + poGlobal.getOperatorKey().toString() + "')"));
                    gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
                    gfc.setResultType(DataType.BAG);
                    fep1.addAsLeaf(gfc);
                    gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2));
                    /*fep1.add(gfc);
                    fep1.connect(ce1, gfc);
                    fep1.connect(ce2, gfc);*/

                    PhysicalPlan fep2 = new PhysicalPlan();
                    POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
                    feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
                    feproj.setResultType(DataType.TUPLE);
                    feproj.setStar(true);
                    feproj.setOverloaded(false);
                    fep2.add(feproj);
                    List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);

                    POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
                    fe.setMapSideOnly(true);
                    fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
                    currentPlan.add(fe);
                    currentPlan.connect(logToPhyMap.get(op), fe);

                    POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
                            scope, nodeGen.getNextNodeId(scope)), cross
                            .getRequestedParallelism());
                    physOp.addOriginalLocation(cross.getAlias(), cross.getLocation());
                    List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
                    for(int i=0;i<inputs.size();i++){
                        PhysicalPlan lrp1 = new PhysicalPlan();
                        POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
                        lrproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
                        lrproj1.setOverloaded(false);
                        lrproj1.setResultType(DataType.INTEGER);
                        lrp1.add(lrproj1);
                        lrPlans.add(lrp1);
                    }

                    physOp.setCross(true);
                    physOp.setIndex(count++);
                    physOp.setKeyType(DataType.TUPLE);
                    physOp.setPlans(lrPlans);
                    physOp.setResultType(DataType.TUPLE);

                    currentPlan.add(physOp);
                    currentPlan.connect(fe, physOp);
                    currentPlan.connect(physOp, poGlobal);
                }
            } catch (PlanException e1) {
                int errCode = 2015;
                String msg = "Invalid physical operators in the physical plan" ;
                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
            } catch (ExecException e) {
                int errCode = 2058;
                String msg = "Unable to set index on newly create POLocalRearrange.";
                throw new VisitorException(msg, errCode, PigException.BUG, e);
            }

            poPackage.getPkgr().setKeyType(DataType.TUPLE);
            poPackage.setResultType(DataType.TUPLE);
            poPackage.setNumInps(count);
            boolean inner[] = new boolean[count];
            for (int i=0;i<count;i++) {
                inner[i] = true;
            }
            poPackage.getPkgr().setInner(inner);

            List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
            List<Boolean> flattenLst = new ArrayList<Boolean>();
            for(int i=1;i<=count;i++){
                PhysicalPlan fep1 = new PhysicalPlan();
                POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
                feproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
                feproj1.setResultType(DataType.BAG);
                feproj1.setOverloaded(false);
                fep1.add(feproj1);
                fePlans.add(fep1);
                flattenLst.add(true);
            }

            POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
            fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
            currentPlan.add(fe);
            try{
                currentPlan.connect(poPackage, fe);
            }catch (PlanException e1) {
                int errCode = 2015;
                String msg = "Invalid physical operators in the physical plan" ;
                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
            }
            logToPhyMap.put(cross, fe);
        }
    }