public void visitSkewedJoin()

in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java [1600:1879]


    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
        //TODO: handle split and connect
        try {

            // The first vertex (prevOp) loads the left table and sends sample of join keys to
            // vertex 2 (sampler vertex) and all data to vertex 3 (partition vertex) via 1-1 edge

            // LR that transfers loaded input to partition vertex
            POLocalRearrangeTez lrTez = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
            // LR that broadcasts sampled input to sampling aggregation vertex
            POLocalRearrangeTez lrTezSample = localRearrangeFactory.create(LocalRearrangeType.NULL);

            int sampleRate = POPoissonSample.DEFAULT_SAMPLE_RATE;
            if (pigProperties.containsKey(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE)) {
                sampleRate = Integer.valueOf(pigProperties.getProperty(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE));
            }
            float heapPerc =  PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
            if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)) {
                heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE));
            }
            long totalMemory = -1;
            if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)) {
                totalMemory = Long.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM));
            }
            POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
                    -1, sampleRate, heapPerc, totalMemory);

            TezOperator samplerOper = compiledInputs[0];
            boolean writeDataForPartitioner = shouldWriteDataForPartitioner(samplerOper);

            PhysicalPlan partitionerPlan = null;
            if (writeDataForPartitioner) {
                samplerOper.plan.addAsLeaf(lrTez);
            } else {
                partitionerPlan = samplerOper.plan.clone();
                partitionerPlan.addAsLeaf(lrTez);
            }

            samplerOper.plan.addAsLeaf(poSample);
            samplerOper.markSampler();

            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
            List<PhysicalOperator> l = plan.getPredecessors(op);
            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
            List<Boolean> ascCol = new ArrayList<Boolean>();
            for (int i=0; i< groups.size(); i++) {
                ascCol.add(false);
            }

            // Set up transform plan to get keys and memory size of input
            // tuples. It first adds all the plans to get key columns.
            List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
            transformPlans.addAll(groups);

            // Then it adds a column for memory size
            POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
            prjStar.setResultType(DataType.TUPLE);
            prjStar.setStar(true);

            List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
            ufInps.add(prjStar);

            PhysicalPlan ep = new PhysicalPlan();
            POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)),
                    -1, ufInps, new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
            uf.setResultType(DataType.TUPLE);
            ep.add(uf);
            ep.add(prjStar);
            ep.connect(prjStar, uf);

            transformPlans.add(ep);

            List<Boolean> flat1 = new ArrayList<Boolean>();
            List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();

            for (int i=0; i<transformPlans.size(); i++) {
                eps1.add(transformPlans.get(i));
                flat1.add(i == transformPlans.size() - 1 ? true : false);
            }

            // This foreach will pick the sort key columns from the POPoissonSample output
            POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
                    -1, eps1, flat1);
            samplerOper.plan.addAsLeaf(nfe1);
            samplerOper.plan.addAsLeaf(lrTezSample);
            samplerOper.setClosed(true);

            int rp = op.getRequestedParallelism();
            if (rp == -1) {
                rp = pigContext.defaultParallel;
            }

            POSort sort = new POSort(op.getOperatorKey(), rp,
                    null, groups, ascCol, null);
            String per = pigProperties.getProperty("pig.skewedjoin.reduce.memusage",
                    String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
            String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");

            Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJob(sort, rp, null,
                    PartitionSkewedKeysTez.class.getName(), new String[]{per, mc});
            rp = sampleJobPair.second;

            TezOperator[] joinJobs = new TezOperator[] {null, compiledInputs[1], null};
            TezOperator[] joinInputs = new TezOperator[] {compiledInputs[0], compiledInputs[1]};
            TezOperator[] rearrangeOutputs = new TezOperator[2];

            compiledInputs = new TezOperator[] {joinInputs[0]};

            // Add a partitioner vertex that partitions the data based on the quantiles from sample vertex.
            curTezOp = getTezOp();
            tezPlan.add(curTezOp);
            joinJobs[0] = curTezOp;

            try {
                lrTez.setIndex(0);
            } catch (ExecException e) {
                int errCode = 2058;
                String msg = "Unable to set index on newly created POLocalRearrange.";
                throw new PlanException(msg, errCode, PigException.BUG, e);
            }

            // Check the type of group keys, if there are more than one field, the key is TUPLE.
            byte type = DataType.TUPLE;
            if (groups.size() == 1) {
                type = groups.get(0).getLeaves().get(0).getResultType();
            }
            lrTez.setKeyType(type);
            lrTez.setPlans(groups);
            lrTez.setResultType(DataType.TUPLE);

            POLocalRearrangeTez partitionerLR = null;
            if (!writeDataForPartitioner) {
                // Read input from hdfs again
                joinJobs[0].plan = partitionerPlan;
                partitionerLR = lrTez;
                lrTez.setSkewedJoin(true);
            } else {
                // Add a POIdentityInOutTez which just passes data through from sampler vertex
                partitionerLR = new POIdentityInOutTez(
                        OperatorKey.genOpKey(scope),
                        lrTez,
                        samplerOper.getOperatorKey().toString());
                partitionerLR.setSkewedJoin(true);
                joinJobs[0].plan.addAsLeaf(partitionerLR);

                // Connect the sampler vertex to the partitioner vertex
                lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, samplerOper, joinJobs[0]);
                // TODO: PIG-3775 unsorted shuffle
                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
                edge.outputClassName = UnorderedKVOutput.class.getName();
                edge.inputClassName = UnorderedKVInput.class.getName();
                // If prevOp.requestedParallelism changes based on no. of input splits
                // it will reflect for joinJobs[0] so that 1-1 edge will work.
                joinJobs[0].setRequestedParallelismByReference(samplerOper);
            }
            joinJobs[0].setClosed(true);
            joinJobs[0].markSampleBasedPartitioner();
            joinJobs[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());

            rearrangeOutputs[0] = joinJobs[0];

            compiledInputs = new TezOperator[] {joinInputs[1]};

            // Run POPartitionRearrange for second join table. Note we set the
            // parallelism of POPartitionRearrange to -1, so its parallelism
            // will be determined by the size of streaming table.
            POPartitionRearrangeTez pr =
                    new POPartitionRearrangeTez(OperatorKey.genOpKey(scope));
            try {
                pr.setIndex(1);
            } catch (ExecException e) {
                int errCode = 2058;
                String msg = "Unable to set index on newly created POPartitionRearrange.";
                throw new PlanException(msg, errCode, PigException.BUG, e);
            }

            groups = joinPlans.get(l.get(1));
            pr.setPlans(groups);
            pr.setKeyType(type);
            pr.setSkewedJoin(true);
            pr.setResultType(DataType.TUPLE);
            joinJobs[1].plan.addAsLeaf(pr);
            joinJobs[1].setClosed(true);
            rearrangeOutputs[1] = joinJobs[1];

            compiledInputs = rearrangeOutputs;

            // Create POGlobalRearrange
            POGlobalRearrange gr =
                    new POGlobalRearrange(OperatorKey.genOpKey(scope), rp);
            // Skewed join has its own special partitioner
            gr.setResultType(DataType.TUPLE);
            gr.visit(this);
            joinJobs[2] = curTezOp;
            joinJobs[2].setRequestedParallelism(rp);

            compiledInputs = new TezOperator[] {joinJobs[2]};

            // Create POPakcage
            POPackage pkg = getPackage(2, type);
            pkg.setResultType(DataType.TUPLE);
            boolean [] inner = op.getInnerFlags();
            pkg.getPkgr().setInner(inner);
            pkg.visit(this);

            compiledInputs = new TezOperator[] {curTezOp};

            // Create POForEach
            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
            List<Boolean> flat = new ArrayList<Boolean>();

            boolean containsRightOuter = false;
            // Add corresponding POProjects
            for (int i=0; i < 2; i++) {
                ep = new PhysicalPlan();
                POProject prj = new POProject(OperatorKey.genOpKey(scope));
                prj.setColumn(i+1);
                prj.setOverloaded(false);
                prj.setResultType(DataType.BAG);
                ep.add(prj);
                eps.add(ep);
                if (!inner[i]) {
                    // Add an empty bag for outer join. For right outer, add IsFirstReduceOfKeyTez UDF as well
                    if (i == 0) {
                        containsRightOuter = true;
                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
                    } else {
                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
                    }
                }
                flat.add(true);
            }

            POForEach fe =
                    new POForEach(OperatorKey.genOpKey(scope), -1, eps, flat);
            fe.setResultType(DataType.TUPLE);
            fe.visit(this);

            // Connect vertices
            lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
            partitionerLR.setOutputKey(joinJobs[2].getOperatorKey().toString());
            pr.setOutputKey(joinJobs[2].getOperatorKey().toString());

            TezCompilerUtil.connect(tezPlan, samplerOper, sampleJobPair.first);

            POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
            for (int i = 0; i <= 2; i++) {
                if (i != 2 || containsRightOuter) {
                    // We need to send sample to left relation partitioner vertex, right relation load vertex,
                    // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
                    joinJobs[i].setSampleOperator(sampleJobPair.first);

                    // Configure broadcast edges for distribution map
                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
                    sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
                }

                // Configure skewed partitioner for join
                if (i != 2) {
                    TezEdgeDescriptor edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
                    edge.partitionerClass = SkewedPartitionerTez.class;
                }
            }

            joinJobs[2].markSkewedJoin();
            sampleJobPair.first.setSortOperator(joinJobs[2]);

            if (rp == -1) {
                sampleJobPair.first.setNeedEstimatedQuantile(true);
            }

            phyToTezOpMap.put(op, curTezOp);
        } catch (Exception e) {
            int errCode = 2034;
            String msg = "Error compiling operator " + op.getClass().getSimpleName();
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
        }
    }