private TExecRequest getTExecRequest()

in fe/src/main/java/org/apache/impala/service/Frontend.java [2391:2720]


  private TExecRequest getTExecRequest(CompilerFactory compilerFactory,
      PlanCtx planCtx, EventSequence timeline) throws ImpalaException {
    TQueryCtx queryCtx = planCtx.getQueryContext();
    LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: "
        + queryCtx.session.database);

    TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
    List<String> warnings = new ArrayList<>();
    waitForHmsEvents(queryCtx, queryOptions, warnings, timeline);
    boolean enable_replan = queryOptions.isEnable_replan();
    final boolean clientSetRequestPool = queryOptions.isSetRequest_pool();
    Preconditions.checkState(
        !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());

    boolean coordOnlyRequestPool = false;

    if (clientSetRequestPool && RequestPoolService.getInstance() != null) {
      final String pool_name = StringUtils.prependIfMissing(
          queryOptions.getRequest_pool(), ROOT_POOL_NAME + ".");

      coordOnlyRequestPool =
          RequestPoolService.getInstance().getPoolConfig(pool_name).only_coordinators;
    }

    List<TExecutorGroupSet> originalExecutorGroupSets;
    if (coordOnlyRequestPool) {
      // The query is set to use an only coordinators request pool which means that no
      // executors will be involved in query execution. Thus, the planner must ignore
      // all executor groups and select the default group instead. The backend will ensure
      // the query is scheduled on the coordinators.
      originalExecutorGroupSets = new ArrayList<>(1);
      TExecutorGroupSet all_coords = new TExecutorGroupSet();
      originalExecutorGroupSets.add(all_coords);
    } else {
      originalExecutorGroupSets = ExecutorMembershipSnapshot.getAllExecutorGroupSets();
    }

    LOG.info("The original executor group sets from executor membership snapshot: "
        + originalExecutorGroupSets);

    boolean default_executor_group = false;
    if (originalExecutorGroupSets.size() == 1) {
      TExecutorGroupSet e = originalExecutorGroupSets.get(0);
      default_executor_group = e.getExec_group_name_prefix() == null
          || e.getExec_group_name_prefix().isEmpty();
    }
    List<TExecutorGroupSet> executorGroupSetsToUse =
        Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets,
            queryOptions.getRequest_pool(), default_executor_group,
            enable_replan
                && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan()));

    int numExecutorGroupSets = executorGroupSetsToUse.size();
    if (numExecutorGroupSets == 0) {
      throw new AnalysisException(
          "No suitable executor group sets can be identified and used.");
    }
    LOG.info("A total of {} executor group sets to be considered for auto-scaling: "
            + executorGroupSetsToUse,
        numExecutorGroupSets);

    TExecRequest req = null;

    // Capture the current state.
    planCtx.compilationState_.captureState();
    boolean isComputeCost = queryOptions.isCompute_processing_cost();

    double cpuCountRootFactor = BackendConfig.INSTANCE.getQueryCpuRootFactor();
    double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor();
    if (isComputeCost) {
      if (queryOptions.isSetQuery_cpu_count_divisor()) {
        cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor();
      }
      FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE,
          Double.doubleToLongBits(cpuCountDivisor));
    }

    TExecutorGroupSet group_set = null;
    String reason = "Unknown";
    int attempt = 0;
    int firstExecutorGroupTotalCores = expectedTotalCores(executorGroupSetsToUse.get(0));
    int lastExecutorGroupTotalCores =
        expectedTotalCores(executorGroupSetsToUse.get(numExecutorGroupSets - 1));
    int i = 0;
    while (i < numExecutorGroupSets) {
      boolean isLastEG = (i == numExecutorGroupSets - 1);
      boolean skipResourceCheckingAtLastEG = isLastEG
          && BackendConfig.INSTANCE.isSkipResourceCheckingOnLastExecutorGroupSet();

      group_set = executorGroupSetsToUse.get(i);
      planCtx.compilationState_.setGroupSet(group_set);
      if (isComputeCost) {
        planCtx.compilationState_.setAvailableCoresPerNode(
            Math.max(queryOptions.getProcessing_cost_min_threads(),
                lastExecutorGroupTotalCores / expectedNumExecutor(group_set)));
      } else {
        planCtx.compilationState_.setAvailableCoresPerNode(queryOptions.getMt_dop());
      }
      LOG.info("Consider executor group set: " + group_set + " with assumption of "
          + planCtx.compilationState_.getAvailableCoresPerNode() + " cores per node.");

      String retryMsg = "";
      while (true) {
        try {
          req = doCreateExecRequest(compilerFactory, planCtx, warnings, timeline);
          markTimelineRetries(attempt, retryMsg, timeline);
          break;
        } catch (InconsistentMetadataFetchException e) {
          if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
            markTimelineRetries(attempt, e.getMessage(), timeline);
            throw e;
          }
          planCtx.compilationState_.disableStmtCacheAndReauthorize();
          if (attempt > 1) {
            // Back-off a bit on later retries.
            Uninterruptibles.sleepUninterruptibly(200 * attempt, TimeUnit.MILLISECONDS);
          }
          retryMsg = e.getMessage();
          LOG.warn("Retrying plan of query {}: {} (retry #{} of {})",
              queryCtx.client_request.stmt, retryMsg, attempt,
              INCONSISTENT_METADATA_NUM_RETRIES);
        }
      }

      // Counters about this group set.
      int availableCores = expectedTotalCores(group_set);
      String profileName = "Executor group " + (i + 1);
      if (group_set.isSetExec_group_name_prefix()
          && !group_set.getExec_group_name_prefix().isEmpty()) {
        profileName += " (" + group_set.getExec_group_name_prefix() + ")";
      }
      TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName);
      addCounter(groupSetProfile,
          new TCounter(MEMORY_MAX, TUnit.BYTES,
              LongMath.saturatedMultiply(
                  expectedNumExecutor(group_set), group_set.getMax_mem_limit())));
      if (isComputeCost) {
        addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, availableCores));
      }

      // If it is for a single node plan, enable_replan is disabled, or it is not a query
      // that can be auto scaled, return the 1st plan generated.
      boolean notScalable = false;
      if (queryOptions.num_nodes == 1) {
        reason = "the number of nodes is 1";
        notScalable = true;
      } else if (!enable_replan) {
        reason = "query option ENABLE_REPLAN=false";
        notScalable = true;
      } else if (!Frontend.canStmtBeAutoScaled(req)) {
        reason = "query is not auto-scalable";
        notScalable = true;
      } else if (coordOnlyRequestPool) {
        reason = "only coordinators request pool specified";
        notScalable = true;
      }

      if (notScalable) {
        setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
        addInfoString(
            groupSetProfile, VERDICT, "Assign to first group because " + reason);
        FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
        break;
      }

      // Find out the per host memory estimated from two possible sources.
      long perHostMemEstimate = -1;
      int cpuAskBounded = -1;
      int cpuAskUnbounded = -1;
      if (req.query_exec_request != null) {
        // For non-explain queries
        perHostMemEstimate = req.query_exec_request.per_host_mem_estimate;
        cpuAskBounded = req.query_exec_request.getCores_required();
        cpuAskUnbounded = req.query_exec_request.getCores_required_unbounded();
      } else {
        // For explain queries
        perHostMemEstimate = planCtx.compilationState_.getEstimatedMemoryPerHost();
        cpuAskBounded = planCtx.compilationState_.getCoresRequired();
        cpuAskUnbounded = planCtx.compilationState_.getCoresRequiredUnbounded();
      }

      Preconditions.checkState(perHostMemEstimate >= 0);
      boolean memReqSatisfied = perHostMemEstimate <= group_set.getMax_mem_limit();
      addCounter(groupSetProfile,
          new TCounter(MEMORY_ASK, TUnit.BYTES,
              LongMath.saturatedMultiply(
                  expectedNumExecutor(group_set), perHostMemEstimate)));

      boolean cpuReqSatisfied = true;
      int scaledCpuAskBounded = -1;
      int scaledCpuAskUnbounded = -1;
      if (isComputeCost) {
        Preconditions.checkState(cpuAskBounded > 0);
        Preconditions.checkState(cpuAskUnbounded > 0);
        if (queryOptions.getProcessing_cost_min_threads()
            > queryOptions.getMax_fragment_instances_per_node()) {
          throw new AnalysisException(
              TImpalaQueryOptions.PROCESSING_COST_MIN_THREADS.name() + " ("
              + queryOptions.getProcessing_cost_min_threads()
              + ") can not be larger than "
              + TImpalaQueryOptions.MAX_FRAGMENT_INSTANCES_PER_NODE.name() + " ("
              + queryOptions.getMax_fragment_instances_per_node() + ").");
        }

        // Mark parallelism before scaling.
        addCounter(
            groupSetProfile, new TCounter(MAX_PARALLELISM, TUnit.UNIT, cpuAskUnbounded));
        addCounter(groupSetProfile,
            new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cpuAskBounded));

        // Scale down cpuAskUnbounded using non-linear function, but cap it at least
        // equal to cpuAskBounded at minimum.
        cpuAskUnbounded = scaleDownCpuSublinear(cpuCountRootFactor,
            firstExecutorGroupTotalCores, cpuAskBounded, cpuAskUnbounded);

        // Do another scale down, but linearly using QUERY_CPU_COUNT_DIVISOR option.
        scaledCpuAskBounded = scaleDownCpuLinear(cpuCountDivisor, cpuAskBounded);
        scaledCpuAskUnbounded = scaleDownCpuLinear(cpuCountDivisor, cpuAskUnbounded);

        // Check if cpu requirement match.
        Preconditions.checkState(scaledCpuAskBounded <= scaledCpuAskUnbounded);
        cpuReqSatisfied = scaledCpuAskUnbounded <= availableCores;

        // Mark parallelism after scaling.
        addCounter(
            groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaledCpuAskUnbounded));
        addCounter(groupSetProfile,
            new TCounter(CPU_ASK_BOUNDED, TUnit.UNIT, scaledCpuAskBounded));
      }

      boolean matchFound = false;
      if (clientSetRequestPool) {
        if (!default_executor_group) {
          Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
              queryOptions.getRequest_pool()));
        }
        reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool()
            + " is set. Memory and cpu limit checking is skipped.";
        addInfoString(groupSetProfile, VERDICT, reason);
        matchFound = true;
      } else if (memReqSatisfied && cpuReqSatisfied) {
        reason = "suitable group found. "
            + MoreObjects.toStringHelper("requirement")
                  .add(MEMORY_ASK, PrintUtils.printBytes(perHostMemEstimate))
                  .add(CPU_ASK, scaledCpuAskUnbounded)
                  .add(CPU_ASK_BOUNDED, scaledCpuAskBounded)
                  .add(EFFECTIVE_PARALLELISM, cpuAskBounded)
                  .toString();
        addInfoString(groupSetProfile, VERDICT, "Match");
        matchFound = true;
      }

      if (!matchFound && skipResourceCheckingAtLastEG) {
        reason = "no executor group set fit. Admit to last executor group set.";
        addInfoString(groupSetProfile, VERDICT, reason);
        matchFound = true;
      }

      // Append this exec group set profile node.
      FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);

      if (matchFound) {
        setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
        if (isComputeCost && req.query_exec_request != null
            && queryOptions.slot_count_strategy == TSlotCountStrategy.PLANNER_CPU_ASK) {
          // Use 'cpuAskBounded' because it is the actual total number of fragment
          // instances that will be distributed.
          int avgSlotsUsePerBackend =
              getAvgSlotsUsePerBackend(req, cpuAskBounded, group_set);
          FrontendProfile.getCurrent().setToCounter(
              AVG_ADMISSION_SLOTS_PER_EXECUTOR, TUnit.UNIT, avgSlotsUsePerBackend);
          req.query_exec_request.setMax_slot_per_executor(
              group_set.getNum_cores_per_executor());
        }
        break;
      }

      // At this point, no match is found and planner will step up to the next bigger
      // executor group set.
      List<String> verdicts = Lists.newArrayListWithCapacity(2);
      List<String> reasons = Lists.newArrayListWithCapacity(2);
      if (!memReqSatisfied) {
        String verdict = "not enough per-host memory";
        verdicts.add(verdict);
        reasons.add(verdict + " (require=" + perHostMemEstimate
            + ", max=" + group_set.getMax_mem_limit() + ")");
      }
      if (!cpuReqSatisfied) {
        String verdict = "not enough cpu cores";
        verdicts.add(verdict);
        reasons.add(verdict + " (require=" + scaledCpuAskUnbounded
            + ", max=" + availableCores + ")");
      }
      reason = String.join(", ", reasons);
      addInfoString(groupSetProfile, VERDICT, String.join(", ", verdicts));
      group_set = null;

      // Restore to the captured state.
      planCtx.compilationState_.restoreState();
      FrontendProfile.getCurrent().addToCounter(
          EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
      // Clear profile children that will not be used.
      FrontendProfile.getCurrent().clearStagedChildrenProfiles();

      i++;
    }

    if (group_set == null) {
      if (reason.equals("Unknown")) {
        throw new AnalysisException("The query does not fit any executor group sets.");
      } else {
        throw new AnalysisException(
            "The query does not fit largest executor group sets. Reason: " + reason
            + ".");
      }
    } else {
      // This group_set is a match.
      FrontendProfile.getCurrent().addToCounter(
          EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
    }

    LOG.info("Selected executor group: " + group_set + ", reason: " + reason);

    FrontendProfile.getCurrent().finalizeStagedChildrenProfiles();

    // Transfer the profile access flag which is collected during 1st compilation.
    req.setUser_has_profile_access(planCtx.compilationState_.userHasProfileAccess());

    return req;
  }