public static void setMapWork()

in ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java [496:755]


  public static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs,
      PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id,
      HiveConf conf, boolean local) throws SemanticException {
    ArrayList<Path> partDir = new ArrayList<Path>();
    ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
    boolean isFullAcidTable = false;

    Path tblDir = null;
    plan.setNameToSplitSample(parseCtx.getNameToSplitSample());
    // we also collect table stats while collecting column stats.
    if (parseCtx.getAnalyzeRewrite() != null) {
      plan.setGatheringStats(true);
    }

    if (partsList == null) {
      Table tab = tsOp.getConf().getTableMetadata();
      if (tab.hasNonNativePartitionSupport()) {
        partsList = new PrunedPartitionList(tab, null, Sets.newHashSet(new DummyPartition(tab)),
            Collections.emptyList(), false);
      } else {
        partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
        isFullAcidTable = tsOp.getConf().isFullAcidTable();
      }
    }

    // Generate the map work for this alias_id
    // pass both confirmed and unknown partitions through the map-reduce
    // framework
    Set<Partition> parts = partsList.getPartitions();
    TableDesc tableSpec = Utilities.getTableDesc(tsOp.getConf().getTableMetadata());
    PartitionDesc aliasPartnDesc = null;
    try {
      if (!parts.isEmpty()) {
        aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next(), tableSpec);
      }
    } catch (HiveException e) {
      LOG.error("Failed getPartitionDesc", e);
      throw new SemanticException(e.getMessage(), e);
    }

    // The table does not have any partitions
    if (aliasPartnDesc == null) {
      aliasPartnDesc = new PartitionDesc(tableSpec, null);
    }

    Map<String, String> props = tsOp.getConf().getOpProps();
    if (props != null) {
      Properties target = aliasPartnDesc.getProperties();
      target.putAll(props);
    }

    plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);

    long sizeNeeded = Integer.MAX_VALUE;
    int fileLimit = -1;
    if (parseCtx.getGlobalLimitCtx().isEnable()) {
      if (isFullAcidTable) {
        LOG.info("Skipping Global Limit optimization for an ACID table");
        parseCtx.getGlobalLimitCtx().disableOpt();
      } else {
        long sizePerRow = HiveConf.getLongVar(parseCtx.getConf(),
            HiveConf.ConfVars.HIVE_LIMIT_MAX_ROW_SIZE);
        sizeNeeded = (parseCtx.getGlobalLimitCtx().getGlobalOffset()
            + parseCtx.getGlobalLimitCtx().getGlobalLimit()) * sizePerRow;
        // for the optimization that reduce number of input file, we limit number
        // of files allowed. If more than specific number of files have to be
        // selected, we skip this optimization. Since having too many files as
        // inputs can cause unpredictable latency. It's not necessarily to be
        // cheaper.
        fileLimit =
            HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVE_LIMIT_OPT_LIMIT_FILE);

        if (sizePerRow <= 0 || fileLimit <= 0) {
          LOG.info("Skip optimization to reduce input size of 'limit'");
          parseCtx.getGlobalLimitCtx().disableOpt();
        } else if (parts.isEmpty()) {
          LOG.info("Empty input: skip limit optimization");
        } else {
          LOG.info("Try to reduce input size for 'limit' " +
              "sizeNeeded: " + sizeNeeded +
              "  file limit : " + fileLimit);
        }
      }
    }
    boolean isFirstPart = true;
    boolean emptyInput = true;
    boolean singlePartition = (parts.size() == 1);

    // Track the dependencies for the view. Consider a query like: select * from V;
    // where V is a view of the form: select * from T
    // The dependencies should include V at depth 0, and T at depth 1 (inferred).
    Map<String, ReadEntity> viewToInput = parseCtx.getViewAliasToInput();
    ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(alias_id, viewToInput);

    // The table should also be considered a part of inputs, even if the table is a
    // partitioned table and whether any partition is selected or not

    //This read entity is a direct read entity and not an indirect read (that is when
    // this is being read because it is a dependency of a view).
    boolean isDirectRead = (parentViewInfo == null);
    TableDesc tblDesc = null;
    boolean initTableDesc = false;

    PlanUtils.addPartitionInputs(parts, inputs, parentViewInfo, isDirectRead);

    for (Partition part: parts) {
      // Later the properties have to come from the partition as opposed
      // to from the table in order to support versioning.
      Path[] paths = null;
      SampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(tsOp);

      // Lookup list bucketing pruner
      Map<String, ExprNodeDesc> partToPruner = parseCtx.getOpToPartToSkewedPruner().get(tsOp);
      ExprNodeDesc listBucketingPruner = (partToPruner != null) ? partToPruner.get(part.getName())
          : null;

      if (sampleDescr != null) {
        assert (listBucketingPruner == null) : "Sampling and list bucketing can't coexit.";
        paths = SamplePruner.prune(part, sampleDescr);
        parseCtx.getGlobalLimitCtx().disableOpt();
      } else if (listBucketingPruner != null) {
        assert (sampleDescr == null) : "Sampling and list bucketing can't coexist.";
        /* Use list bucketing prunner's path. */
        paths = ListBucketingPruner.prune(parseCtx, part, listBucketingPruner);
      } else {
        // Now we only try the first partition, if the first partition doesn't
        // contain enough size, we change to normal mode.
        if (parseCtx.getGlobalLimitCtx().isEnable()) {
          if (isFirstPart) {
            long sizeLeft = sizeNeeded;
            ArrayList<Path> retPathList = new ArrayList<Path>();
            SamplePruner.LimitPruneRetStatus status = SamplePruner.limitPrune(part, sizeLeft,
                fileLimit, retPathList);
            if (status.equals(SamplePruner.LimitPruneRetStatus.NoFile)) {
              continue;
            } else if (status.equals(SamplePruner.LimitPruneRetStatus.NotQualify)) {
              LOG.info("Use full input -- first " + fileLimit + " files are more than "
                  + sizeNeeded
                  + " bytes");

              parseCtx.getGlobalLimitCtx().disableOpt();

            } else {
              emptyInput = false;
              paths = new Path[retPathList.size()];
              int index = 0;
              for (Path path : retPathList) {
                paths[index++] = path;
              }
              if (status.equals(SamplePruner.LimitPruneRetStatus.NeedAllFiles) && singlePartition) {
                // if all files are needed to meet the size limit, we disable
                // optimization. It usually happens for empty table/partition or
                // table/partition with only one file. By disabling this
                // optimization, we can avoid retrying the query if there is
                // not sufficient rows.
                parseCtx.getGlobalLimitCtx().disableOpt();
              }
            }
            isFirstPart = false;
          } else {
            paths = new Path[0];
          }
        }
        if (!parseCtx.getGlobalLimitCtx().isEnable()) {
          paths = part.getPath();
        }
      }

      // is it a partitioned table ?
      if (!part.getTable().isPartitioned()) {
        assert (tblDir == null);

        tblDir = paths[0];
        if (!initTableDesc) {
          tblDesc = Utilities.getTableDesc(part.getTable());
          initTableDesc = true;
        }
      } else if (tblDesc == null) {
        if (!initTableDesc) {
          tblDesc = Utilities.getTableDesc(part.getTable());
          initTableDesc = true;
        }
      }

      if (props != null) {
        Properties target = tblDesc.getProperties();
        target.putAll(props);
      }

      for (Path p : paths) {
        if (p == null) {
          continue;
        }
        LOG.debug("Adding {} of table {}", p, alias_id);

        partDir.add(p);
        try {
          if (part.getTable().isPartitioned()) {
            partDesc.add(Utilities.getPartitionDesc(part, tblDesc));
          }
          else {
            partDesc.add(Utilities.getPartitionDescFromTableDesc(tblDesc, part, false));
          }
        } catch (HiveException e) {
          LOG.error("Failed to add partition description", e);
          throw new SemanticException(e.getMessage(), e);
        }
      }
    }

    if (emptyInput) {
      parseCtx.getGlobalLimitCtx().disableOpt();
    }

    Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),tsOp);

    Iterator<Path> iterPath = partDir.iterator();
    Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();

    if (!local) {
      while (iterPath.hasNext()) {
        assert iterPartnDesc.hasNext();
        Path path = iterPath.next();

        PartitionDesc prtDesc = iterPartnDesc.next();

        // Add the path to alias mapping
        plan.addPathToAlias(path,alias_id);
        plan.addPathToPartitionInfo(path, prtDesc);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Information added for path " + path);
        }
      }

      assert plan.getAliasToWork().get(alias_id) == null;
      plan.getAliasToWork().put(alias_id, tsOp);
    } else {
      // populate local work if needed
      MapredLocalWork localPlan = plan.getMapRedLocalWork();
      if (localPlan == null) {
        localPlan = new MapredLocalWork(
            new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
            new LinkedHashMap<String, FetchWork>());
      }

      assert localPlan.getAliasToWork().get(alias_id) == null;
      assert localPlan.getAliasToFetchWork().get(alias_id) == null;
      localPlan.getAliasToWork().put(alias_id, tsOp);
      if (tblDir == null) {
        tblDesc = Utilities.getTableDesc(partsList.getSourceTable());
        localPlan.getAliasToFetchWork().put(
            alias_id,
            new FetchWork(partDir, partDesc, tblDesc));
      } else {
        localPlan.getAliasToFetchWork().put(alias_id,
            new FetchWork(tblDir, tblDesc));
      }
      plan.setMapRedLocalWork(localPlan);
    }
  }