public void compile()

in ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java [126:453]


  public void compile(final ParseContext pCtx,
      final List<Task<?>> rootTasks,
      final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) throws SemanticException {

    Context ctx = pCtx.getContext();
    GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
    List<Task<MoveWork>> mvTask = new ArrayList<>();

    List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
    List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();

    boolean isCStats = pCtx.getQueryProperties().isAnalyzeRewrite();
    int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();

    boolean directInsert = false;
    if (pCtx.getCreateTable() != null && pCtx.getCreateTable().getStorageHandler() != null) {
      try {
        directInsert =
            HiveUtils.getStorageHandler(conf, pCtx.getCreateTable().getStorageHandler()).directInsert();
      } catch (HiveException e) {
        throw new SemanticException("Failed to load storage handler:  " + e.getMessage());
      }
    }
    if (pCtx.getCreateViewDesc() != null && pCtx.getCreateViewDesc().getStorageHandler() != null) {
      try {
        directInsert =
                HiveUtils.getStorageHandler(conf, pCtx.getCreateViewDesc().getStorageHandler()).directInsert();
      } catch (HiveException e) {
        throw new SemanticException("Failed to load storage handler:  " + e.getMessage());
      }
    }

    if (pCtx.getFetchTask() != null) {
      if (pCtx.getFetchTask().getTblDesc() == null) {
        return;
      }
      pCtx.getFetchTask().getWork().setHiveServerQuery(SessionState.get().isHiveServerQuery());
      TableDesc resultTab = pCtx.getFetchTask().getTblDesc();
      // If the serializer is ThriftJDBCBinarySerDe, then it requires that NoOpFetchFormatter be used. But when it isn't,
      // then either the ThriftFormatter or the DefaultFetchFormatter should be used.
      if (!resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
        if (SessionState.get().isHiveServerQuery()) {
          conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER,ThriftFormatter.class.getName());
        } else {
          String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER);
          if (formatterName == null || formatterName.isEmpty()) {
            conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, DefaultFetchFormatter.class.getName());
          }
        }
      }

      return;
    }

    if (!pCtx.getQueryProperties().isAnalyzeCommand()) {
      LOG.debug("Skipping optimize operator plan for analyze command.");
      optimizeOperatorPlan(pCtx);
    }

    /*
     * In case of a select, use a fetch task instead of a move task.
     * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
     * a column stats task later.
     */
    if (pCtx.getQueryProperties().isQuery() && !isCStats) {
      if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
        throw new SemanticException(ErrorMsg.INVALID_LOAD_TABLE_FILE_WORK.getMsg());
      }

      LoadFileDesc loadFileDesc = loadFileWork.get(0);

      String cols = loadFileDesc.getColumns();
      String colTypes = loadFileDesc.getColumnTypes();

      TableDesc resultTab = pCtx.getFetchTableDesc();
      boolean shouldSetOutputFormatter = false;
      if (resultTab == null) {
        ResultFileFormat resFileFormat = conf.getResultFileFormat();
        String fileFormat;
        Class<? extends Deserializer> serdeClass;
        if (SessionState.get().getIsUsingThriftJDBCBinarySerDe()
            && resFileFormat == ResultFileFormat.SEQUENCEFILE) {
          fileFormat = resFileFormat.toString();
          serdeClass = ThriftJDBCBinarySerDe.class;
          shouldSetOutputFormatter = true;
        } else if (resFileFormat == ResultFileFormat.SEQUENCEFILE) {
          // file format is changed so that IF file sink provides list of files to fetch from (instead
          // of whole directory) list status is done on files (which is what HiveSequenceFileInputFormat does)
          fileFormat = "HiveSequenceFile";
          serdeClass = LazySimpleSerDe.class;
        } else {
          // All other cases we use the defined file format and LazySimpleSerde
          fileFormat = resFileFormat.toString();
          serdeClass = LazySimpleSerDe.class;
        }
        resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, serdeClass);
      } else {
        shouldSetOutputFormatter = resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)
          .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName());
      }

      if (shouldSetOutputFormatter) {
        // Set the fetch formatter to be a no-op for the ListSinkOperator, since we will
        // read formatted thrift objects from the output SequenceFile written by Tasks.
        conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
      }

      FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit);
      boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
      fetch.setHiveServerQuery(isHiveServerQuery);
      fetch.setSource(pCtx.getFetchSource());
      fetch.setSink(pCtx.getFetchSink());
      if (isHiveServerQuery &&
        null != resultTab &&
        resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
          fetch.setIsUsingThriftJDBCBinarySerDe(true);
      } else {
          fetch.setIsUsingThriftJDBCBinarySerDe(false);
      }

      // The idea here is to keep an object reference both in FileSink and in FetchTask for list of files
      // to be fetched. During Job close file sink will populate the list and fetch task later will use it
      // to fetch the results.
      Collection<Operator<?>> tableScanOps =
          Lists.<Operator<?>>newArrayList(pCtx.getTopOps().values());
      Set<FileSinkOperator> fsOps = OperatorUtils.findOperators(tableScanOps, FileSinkOperator.class);
      if(fsOps != null && fsOps.size() == 1) {
        FileSinkOperator op = fsOps.iterator().next();
        Set<FileStatus> filesToFetch =  new HashSet<>();
        op.getConf().setFilesToFetch(filesToFetch);
        fetch.setFilesToFetch(filesToFetch);
      }

      pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch));

      // For the FetchTask, the limit optimization requires we fetch all the rows
      // in memory and count how many rows we get. It's not practical if the
      // limit factor is too big
      int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_LIMIT_OPT_MAX_FETCH);
      if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
        LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
            + ". Doesn't qualify limit optimization.");
        globalLimitCtx.disableOpt();

      }
      if (outerQueryLimit == 0) {
        // Believe it or not, some tools do generate queries with limit 0 and than expect
        // query to run quickly. Let's meet their requirement.
        LOG.info("Limit 0. No query execution needed.");
        return;
      }
    } else if (!isCStats) {
      for (LoadTableDesc ltd : loadTableWork) {
        Task<MoveWork> tsk = TaskFactory
            .get(new MoveWork(pCtx.getQueryProperties().isCTAS() && pCtx.getCreateTable().isExternal(),
                    null, null, ltd, null, false));
        mvTask.add(tsk);
      }

      boolean oneLoadFileForCtas = true;
      for (LoadFileDesc lfd : loadFileWork) {
        if (pCtx.getQueryProperties().isCTAS() || pCtx.getQueryProperties().isMaterializedView()) {
          if (!oneLoadFileForCtas) { // should not have more than 1 load file for CTAS.
            throw new SemanticException(
                "One query is not expected to contain multiple CTAS loads statements");
          }
          setLoadFileLocation(pCtx, lfd);
          oneLoadFileForCtas = false;
        }
        mvTask.add(TaskFactory.get(
            new MoveWork(pCtx.getQueryProperties().isCTAS() && pCtx.getCreateTable().isExternal(), null, null, null,
                lfd, false)));
      }
    }

    generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);

    // For each task, set the key descriptor for the reducer
    for (Task<?> rootTask : rootTasks) {
      GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
    }

    // If a task contains an operator which instructs bucketizedhiveinputformat
    // to be used, please do so
    for (Task<?> rootTask : rootTasks) {
      setInputFormat(rootTask);
    }

    if (directInsert) {
      if (pCtx.getCreateTable() != null) {
        CreateTableDesc crtTblDesc = pCtx.getCreateTable();
        crtTblDesc.validate(conf);
        Task<?> crtTask = TaskFactory.get(new DDLWork(inputs, outputs, crtTblDesc));
        for (Task<?> rootTask : rootTasks) {
          crtTask.addDependentTask(rootTask);
          rootTasks.clear();
          rootTasks.add(crtTask);
        }
      } else if (pCtx.getCreateViewDesc() != null) {
        CreateMaterializedViewDesc createMaterializedViewDesc = pCtx.getCreateViewDesc();
        Task<?> crtTask = TaskFactory.get(new DDLWork(inputs, outputs, createMaterializedViewDesc));
        MaterializedViewUpdateDesc materializedViewUpdateDesc = new MaterializedViewUpdateDesc(
                createMaterializedViewDesc.getViewName(), createMaterializedViewDesc.isRewriteEnabled(), false, false);
        Task<?> updateTask = TaskFactory.get(new DDLWork(inputs, outputs, materializedViewUpdateDesc));
        crtTask.addDependentTask(updateTask);
        for (Task<?> rootTask : rootTasks) {
          updateTask.addDependentTask(rootTask);
          rootTasks.clear();
          rootTasks.add(crtTask);
        }
      }
    }

    optimizeTaskPlan(rootTasks, pCtx, ctx);

    /*
     * If the query was the result of analyze table column compute statistics rewrite, create
     * a column stats task instead of a fetch task to persist stats to the metastore.
     * As per HIVE-15903, we will also collect table stats when user computes column stats.
     * That means, if isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()
     * We need to collect table stats
     * if isCStats, we need to include a basic stats task
     * else it is ColumnStatsAutoGather, which should have a move task with a stats task already.
     */
    if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
      // map from tablename to task (ColumnStatsTask which includes a BasicStatsTask)
      Map<String, StatsTask> map = new LinkedHashMap<>();
      if (isCStats) {
        if (rootTasks == null || rootTasks.size() != 1 || pCtx.getTopOps() == null
            || pCtx.getTopOps().size() != 1) {
          throw new SemanticException("Can not find correct root task!");
        }
        try {
          Task<?> root = rootTasks.iterator().next();
          StatsTask tsk = (StatsTask) genTableStats(pCtx, pCtx.getTopOps().values()
              .iterator().next(), root, outputs);
          root.addDependentTask(tsk);
          map.put(extractTableFullName(tsk), tsk);
        } catch (HiveException e) {
          throw new SemanticException(e);
        }
        genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, map, outerQueryLimit, 0);
      } else {
        Set<Task<?>> leafTasks = new LinkedHashSet<Task<?>>();
        getLeafTasks(rootTasks, leafTasks);
        List<Task<?>> nonStatsLeafTasks = new ArrayList<>();
        for (Task<?> tsk : leafTasks) {
          // map table name to the correct ColumnStatsTask
          if (tsk instanceof StatsTask) {
            map.put(extractTableFullName((StatsTask) tsk), (StatsTask) tsk);
          } else {
            nonStatsLeafTasks.add(tsk);
          }
        }
        // add cStatsTask as a dependent of all the nonStatsLeafTasks
        for (Task<?> tsk : nonStatsLeafTasks) {
          for (Task<?> cStatsTask : map.values()) {
            tsk.addDependentTask(cStatsTask);
          }
        }
        for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
            .getColumnStatsAutoGatherContexts()) {
          if (!columnStatsAutoGatherContext.isInsertInto()) {
            genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
                columnStatsAutoGatherContext.getLoadFileWork(), map, outerQueryLimit, 0);
          } else {
            int numBitVector;
            try {
              numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
            } catch (Exception e) {
              throw new SemanticException(e.getMessage());
            }
            genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
                columnStatsAutoGatherContext.getLoadFileWork(), map, outerQueryLimit, numBitVector);
          }
        }
      }
    }

    decideExecMode(rootTasks, ctx, globalLimitCtx);

    // for direct insert CTAS, we don't need this table creation DDL task, since the table will be created
    // ahead of time by the non-native table
    if (pCtx.getQueryProperties().isCTAS() && !pCtx.getCreateTable().isMaterialization() && !directInsert) {
      // generate a DDL task and make it a dependent task of the leaf
      CreateTableDesc crtTblDesc = pCtx.getCreateTable();
      crtTblDesc.validate(conf);
      Task<?> crtTblTask = TaskFactory.get(new DDLWork(inputs, outputs, crtTblDesc));
      patchUpAfterCTASorMaterializedView(rootTasks, inputs, outputs, crtTblTask,
          CollectionUtils.isEmpty(crtTblDesc.getPartColNames()));
    } else if (pCtx.getQueryProperties().isMaterializedView() && !directInsert) {
      // generate a DDL task and make it a dependent task of the leaf
      CreateMaterializedViewDesc viewDesc = pCtx.getCreateViewDesc();
      Task<?> crtViewTask = TaskFactory.get(new DDLWork(
          inputs, outputs, viewDesc));
      patchUpAfterCTASorMaterializedView(rootTasks, inputs, outputs, crtViewTask,
          CollectionUtils.isEmpty(viewDesc.getPartColNames()));
    } else if (pCtx.getMaterializedViewUpdateDesc() != null) {
      // If there is a materialized view update desc, we create introduce it at the end
      // of the tree.
      MaterializedViewUpdateDesc materializedViewDesc = pCtx.getMaterializedViewUpdateDesc();
      DDLWork ddlWork = new DDLWork(inputs, outputs, materializedViewDesc);
      Set<Task<?>> leafTasks = new LinkedHashSet<Task<?>>();
      getLeafTasks(rootTasks, leafTasks);
      Task<?> materializedViewTask = TaskFactory.get(ddlWork, conf);
      for (Task<?> task : leafTasks) {
        task.addDependentTask(materializedViewTask);
      }
    }

    if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
      LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
      pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
    }

    if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
      LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
      globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
    }

    Interner<TableDesc> interner = Interners.newStrongInterner();

    // Perform Final chores on generated Map works
    //   1.  Intern the table descriptors
    //   2.  Derive final explain attributes based on previous compilation.
    GenMapRedUtils.finalMapWorkChores(rootTasks, pCtx.getConf(), interner);
  }