public void launchPig()

in src/org/apache/pig/pen/LocalMapReduceSimulator.java [86:259]


    public void launchPig(PhysicalPlan php, Map<LOLoad, DataBag> baseData,
                              LineageTracer lineage,
                              IllustratorAttacher attacher,
                              ExampleGenerator eg,
                              PigContext pc) throws PigException, IOException, InterruptedException {
        phyToMRMap.clear();
        MROperPlan mrp = launcher.compile(php, pc);

        ConfigurationValidator.validatePigProperties(pc.getProperties());
        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());

        JobControlCompiler jcc = new JobControlCompiler(pc, conf);

        JobControl jc;
        int numMRJobsCompl = 0;
        DataBag input;
        List<Pair<PigNullableWritable, Writable>> intermediateData = new ArrayList<Pair<PigNullableWritable, Writable>>();

        Map<Job, MapReduceOper> jobToMroMap = jcc.getJobMroMap();
        HashMap<String, DataBag> output = new HashMap<String, DataBag>();
        Configuration jobConf;
        // jc is null only when mrp.size == 0
        boolean needFileInput;
        final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
        pc.getProperties().setProperty("pig.illustrating", "true");
        String jtIdentifier = "" + System.currentTimeMillis();
        int jobId = 0;
        while(mrp.size() != 0) {
            jc = jcc.compile(mrp, "Illustrator");
            if(jc == null) {
                throw new ExecException("Native execution is not supported");
            }
            List<Job> jobs = jc.getWaitingJobs();
            for (Job job : jobs) {
                jobId++;
                jobConf = job.getJobConf();
                FileLocalizer.setInitialized(false);
                ArrayList<ArrayList<OperatorKey>> inpTargets =
                    (ArrayList<ArrayList<OperatorKey>>)
                      ObjectSerializer.deserialize(jobConf.get("pig.inpTargets"));
                intermediateData.clear();
                MapReduceOper mro = jobToMroMap.get(job);
                PigSplit split = null;
                List<POStore> stores = null;
                PhysicalOperator pack = null;
                // revisit as there are new physical operators from MR compilation
                if (!mro.mapPlan.isEmpty())
                    attacher.revisit(mro.mapPlan);
                if (!mro.reducePlan.isEmpty()) {
                    attacher.revisit(mro.reducePlan);
                    pack = mro.reducePlan.getRoots().get(0);
                }

                List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                if (!mro.mapPlan.isEmpty()) {
                    stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
                }
                if (!mro.reducePlan.isEmpty()) {
                    if (stores == null)
                        stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
                    else
                        stores.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class));
                }

                for (POStore store : stores) {
                    output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                }

                OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                oa.visit();

                if (!mro.reducePlan.isEmpty()) {
                    oa = new OutputAttacher(mro.reducePlan, output);
                    oa.visit();
                }
                int index = 0;
                for (POLoad ld : lds) {
                    input = output.get(ld.getLFile().getFileName());
                    if (input == null && baseData != null) {
                        for (LogicalRelationalOperator lo : baseData.keySet()) {
                            if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
                            {
                                 input = baseData.get(lo);
                                 break;
                            }
                        }
                    }
                    if (input != null)
                        mro.mapPlan.remove(ld);
                }
                int mapTaskId = 0;
                for (POLoad ld : lds) {
                    // check newly generated data first
                    input = output.get(ld.getLFile().getFileName());
                    if (input == null && baseData != null) {
                        if (input == null && baseData != null) {
                            for (LogicalRelationalOperator lo : baseData.keySet()) {
                                if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
                                {
                                     input = baseData.get(lo);
                                     break;
                                }
                            }
                        }
                    }
                    needFileInput = (input == null);
                    split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
                    ++index;
                    Mapper<Text, Tuple, PigNullableWritable, Writable> map;

                    if (mro.reducePlan.isEmpty()) {
                        // map-only
                        map = new PigMapOnly.Map();
                        Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
                          .getIllustratorContext(jobConf, input, intermediateData, split);
                        if(mro.isCounterOperation()) {
                            if(mro.isRowNumber()) {
                                map = new PigMapReduceCounter.PigMapCounter();
                            }
                            context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
                        }
                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                        map.run(context);
                    } else {
                        if ("true".equals(jobConf.get("pig.usercomparator")))
                            map = new PigMapReduce.MapWithComparator();
                        else if (!"".equals(jobConf.get("pig.keyDistFile", "")))
                            map = new PigMapReduce.MapWithPartitionIndex();
                        else
                            map = new PigMapReduce.Map();
                        Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
                          .getIllustratorContext(jobConf, input, intermediateData, split);
                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                        map.run(context);
                    }
                }

                if (!mro.reducePlan.isEmpty())
                {
                    if (pack instanceof POPackage)
                        mro.reducePlan.remove(pack);
                    // reducer run
                    PigMapReduce.Reduce reduce;
                    if ("true".equals(jobConf.get("pig.usercomparator")))
                        reduce = new PigMapReduce.ReduceWithComparator();
                    else
                        reduce = new PigMapReduce.Reduce();
                    Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
                        context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);

                    if(mro.isCounterOperation()) {
                        reduce = new PigMapReduceCounter.PigReduceCounter();
                        context = ((PigMapReduceCounter.PigReduceCounter)reduce).getIllustratorContext(job, intermediateData, (POPackage) pack);
                    }

                    ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
                    context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
                    reduce.run(context);
                }
                for (PhysicalOperator key : mro.phyToMRMap.keySet())
                    for (PhysicalOperator value : mro.phyToMRMap.get(key))
                        phyToMRMap.put(key, value);
            }


            int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());

            numMRJobsCompl += removedMROp;
        }

        jcc.reset();
    }