public void visitSkewedJoin()

in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java [1810:1978]


    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
        try {
            if (compiledInputs.length != 2) {
                int errCode = 2255;
                throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
            }

            //change plan to store the first join input into a temp file
            FileSpec fSpec = getTempFileSpec();
            MapReduceOper mro = compiledInputs[0];
            POStore str = getStore();
            str.setSFile(fSpec);
            if (!mro.isMapDone()) {
                mro.mapPlan.addAsLeaf(str);
                mro.setMapDoneSingle(true);
            } else if (mro.isMapDone() && !mro.isReduceDone()) {
                mro.reducePlan.addAsLeaf(str);
                mro.setReduceDone(true);
            } else {
                int errCode = 2022;
                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                throw new PlanException(msg, errCode, PigException.BUG);
            }

            FileSpec partitionFile = getTempFileSpec();
            int rp = op.getRequestedParallelism();

            Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
            rp = sampleJobPair.second;

            // set parallelism of SkewedJoin as the value calculated by sampling job
            // if "parallel" is specified in join statement, "rp" is equal to that number
            // if not specified, use the value that sampling process calculated
            // based on default.
            op.setRequestedParallelism(rp);

            // load the temp file for first table as input of join
            MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
            MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];

            compiledInputs = new MapReduceOper[] {joinInputs[0]};
            // run POLocalRearrange for first join table
            POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
            try {
                lr.setIndex(0);
            } catch (ExecException e) {
                int errCode = 2058;
                String msg = "Unable to set index on newly created POLocalRearrange.";
                throw new PlanException(msg, errCode, PigException.BUG, e);
            }

            List<PhysicalOperator> l = plan.getPredecessors(op);
            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
            // check the type of group keys, if there are more than one field, the key is TUPLE.
            byte type = DataType.TUPLE;
            if (groups.size() == 1) {
                type = groups.get(0).getLeaves().get(0).getResultType();
            }

            lr.setKeyType(type);
            lr.setPlans(groups);
            lr.setResultType(DataType.TUPLE);

            lr.visit(this);
            if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
                curMROp.requestedParallelism = lr.getRequestedParallelism();
            rearrangeOutputs[0] = curMROp;

            compiledInputs = new MapReduceOper[] {joinInputs[1]};
            // if the map for current input is already closed, then start a new job
            if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
                FileSpec f = getTempFileSpec();
                POStore s = getStore();
                s.setSFile(f);
                compiledInputs[0].reducePlan.addAsLeaf(s);
                compiledInputs[0].setReduceDone(true);
                compiledInputs[0] = startNew(f, compiledInputs[0]);
            }

            // run POPartitionRearrange for second join table
            POPartitionRearrange pr =
                new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
            pr.setPigContext(pigContext);
            lr = pr;
            try {
                lr.setIndex(1);
            } catch (ExecException e) {
                int errCode = 2058;
                String msg = "Unable to set index on newly created POLocalRearrange.";
                throw new PlanException(msg, errCode, PigException.BUG, e);
            }

            groups = joinPlans.get(l.get(1));
            lr.setPlans(groups);
            lr.setKeyType(type);
            lr.setResultType(DataType.BAG);

            lr.visit(this);
            if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
                curMROp.requestedParallelism = lr.getRequestedParallelism();
            rearrangeOutputs[1] = curMROp;
            compiledInputs = rearrangeOutputs;


            // create POGlobalRearrange
            POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
            // Skewed join has its own special partitioner
            gr.setResultType(DataType.TUPLE);
            gr.visit(this);
            if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
                curMROp.requestedParallelism = gr.getRequestedParallelism();
            compiledInputs = new MapReduceOper[] {curMROp};

            // create POPakcage
            POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
            Packager pkgr = pkg.getPkgr();
            pkgr.setKeyType(type);
            pkg.setResultType(DataType.TUPLE);
            pkg.setNumInps(2);
            boolean [] inner = op.getInnerFlags();
            pkgr.setInner(inner);
            pkg.visit(this);
            compiledInputs = new MapReduceOper[] {curMROp};

            // create POForEach
            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
            List<Boolean> flat = new ArrayList<Boolean>();

            PhysicalPlan ep;
            // Add corresponding POProjects
            for (int i=0; i < 2; i++ ) {
                ep = new PhysicalPlan();
                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
                prj.setColumn(i+1);
                prj.setOverloaded(false);
                prj.setResultType(DataType.BAG);
                ep.add(prj);
                eps.add(ep);
                if (!inner[i]) {
                    // Add an empty bag for outer join.
                    if (i == 0) {
                        // For right outer, add IsFirstReduceOfKey UDF as well
                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
                    } else {
                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
                    }
                }
                flat.add(true);
            }

            POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
            fe.setResultType(DataType.TUPLE);

            fe.visit(this);

            curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
            phyToMROpMap.put(op, curMROp);
        }catch(PlanException e) {
            int errCode = 2034;
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
        }catch(IOException e) {
            int errCode = 2034;
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
        }

    }