in fe/src/main/java/org/apache/impala/service/Frontend.java [2391:2720]
private TExecRequest getTExecRequest(CompilerFactory compilerFactory,
PlanCtx planCtx, EventSequence timeline) throws ImpalaException {
TQueryCtx queryCtx = planCtx.getQueryContext();
LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: "
+ queryCtx.session.database);
TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
List<String> warnings = new ArrayList<>();
waitForHmsEvents(queryCtx, queryOptions, warnings, timeline);
boolean enable_replan = queryOptions.isEnable_replan();
final boolean clientSetRequestPool = queryOptions.isSetRequest_pool();
Preconditions.checkState(
!clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());
boolean coordOnlyRequestPool = false;
if (clientSetRequestPool && RequestPoolService.getInstance() != null) {
final String pool_name = StringUtils.prependIfMissing(
queryOptions.getRequest_pool(), ROOT_POOL_NAME + ".");
coordOnlyRequestPool =
RequestPoolService.getInstance().getPoolConfig(pool_name).only_coordinators;
}
List<TExecutorGroupSet> originalExecutorGroupSets;
if (coordOnlyRequestPool) {
// The query is set to use an only coordinators request pool which means that no
// executors will be involved in query execution. Thus, the planner must ignore
// all executor groups and select the default group instead. The backend will ensure
// the query is scheduled on the coordinators.
originalExecutorGroupSets = new ArrayList<>(1);
TExecutorGroupSet all_coords = new TExecutorGroupSet();
originalExecutorGroupSets.add(all_coords);
} else {
originalExecutorGroupSets = ExecutorMembershipSnapshot.getAllExecutorGroupSets();
}
LOG.info("The original executor group sets from executor membership snapshot: "
+ originalExecutorGroupSets);
boolean default_executor_group = false;
if (originalExecutorGroupSets.size() == 1) {
TExecutorGroupSet e = originalExecutorGroupSets.get(0);
default_executor_group = e.getExec_group_name_prefix() == null
|| e.getExec_group_name_prefix().isEmpty();
}
List<TExecutorGroupSet> executorGroupSetsToUse =
Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets,
queryOptions.getRequest_pool(), default_executor_group,
enable_replan
&& (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan()));
int numExecutorGroupSets = executorGroupSetsToUse.size();
if (numExecutorGroupSets == 0) {
throw new AnalysisException(
"No suitable executor group sets can be identified and used.");
}
LOG.info("A total of {} executor group sets to be considered for auto-scaling: "
+ executorGroupSetsToUse,
numExecutorGroupSets);
TExecRequest req = null;
// Capture the current state.
planCtx.compilationState_.captureState();
boolean isComputeCost = queryOptions.isCompute_processing_cost();
double cpuCountRootFactor = BackendConfig.INSTANCE.getQueryCpuRootFactor();
double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor();
if (isComputeCost) {
if (queryOptions.isSetQuery_cpu_count_divisor()) {
cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor();
}
FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE,
Double.doubleToLongBits(cpuCountDivisor));
}
TExecutorGroupSet group_set = null;
String reason = "Unknown";
int attempt = 0;
int firstExecutorGroupTotalCores = expectedTotalCores(executorGroupSetsToUse.get(0));
int lastExecutorGroupTotalCores =
expectedTotalCores(executorGroupSetsToUse.get(numExecutorGroupSets - 1));
int i = 0;
while (i < numExecutorGroupSets) {
boolean isLastEG = (i == numExecutorGroupSets - 1);
boolean skipResourceCheckingAtLastEG = isLastEG
&& BackendConfig.INSTANCE.isSkipResourceCheckingOnLastExecutorGroupSet();
group_set = executorGroupSetsToUse.get(i);
planCtx.compilationState_.setGroupSet(group_set);
if (isComputeCost) {
planCtx.compilationState_.setAvailableCoresPerNode(
Math.max(queryOptions.getProcessing_cost_min_threads(),
lastExecutorGroupTotalCores / expectedNumExecutor(group_set)));
} else {
planCtx.compilationState_.setAvailableCoresPerNode(queryOptions.getMt_dop());
}
LOG.info("Consider executor group set: " + group_set + " with assumption of "
+ planCtx.compilationState_.getAvailableCoresPerNode() + " cores per node.");
String retryMsg = "";
while (true) {
try {
req = doCreateExecRequest(compilerFactory, planCtx, warnings, timeline);
markTimelineRetries(attempt, retryMsg, timeline);
break;
} catch (InconsistentMetadataFetchException e) {
if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
markTimelineRetries(attempt, e.getMessage(), timeline);
throw e;
}
planCtx.compilationState_.disableStmtCacheAndReauthorize();
if (attempt > 1) {
// Back-off a bit on later retries.
Uninterruptibles.sleepUninterruptibly(200 * attempt, TimeUnit.MILLISECONDS);
}
retryMsg = e.getMessage();
LOG.warn("Retrying plan of query {}: {} (retry #{} of {})",
queryCtx.client_request.stmt, retryMsg, attempt,
INCONSISTENT_METADATA_NUM_RETRIES);
}
}
// Counters about this group set.
int availableCores = expectedTotalCores(group_set);
String profileName = "Executor group " + (i + 1);
if (group_set.isSetExec_group_name_prefix()
&& !group_set.getExec_group_name_prefix().isEmpty()) {
profileName += " (" + group_set.getExec_group_name_prefix() + ")";
}
TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName);
addCounter(groupSetProfile,
new TCounter(MEMORY_MAX, TUnit.BYTES,
LongMath.saturatedMultiply(
expectedNumExecutor(group_set), group_set.getMax_mem_limit())));
if (isComputeCost) {
addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, availableCores));
}
// If it is for a single node plan, enable_replan is disabled, or it is not a query
// that can be auto scaled, return the 1st plan generated.
boolean notScalable = false;
if (queryOptions.num_nodes == 1) {
reason = "the number of nodes is 1";
notScalable = true;
} else if (!enable_replan) {
reason = "query option ENABLE_REPLAN=false";
notScalable = true;
} else if (!Frontend.canStmtBeAutoScaled(req)) {
reason = "query is not auto-scalable";
notScalable = true;
} else if (coordOnlyRequestPool) {
reason = "only coordinators request pool specified";
notScalable = true;
}
if (notScalable) {
setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
addInfoString(
groupSetProfile, VERDICT, "Assign to first group because " + reason);
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
break;
}
// Find out the per host memory estimated from two possible sources.
long perHostMemEstimate = -1;
int cpuAskBounded = -1;
int cpuAskUnbounded = -1;
if (req.query_exec_request != null) {
// For non-explain queries
perHostMemEstimate = req.query_exec_request.per_host_mem_estimate;
cpuAskBounded = req.query_exec_request.getCores_required();
cpuAskUnbounded = req.query_exec_request.getCores_required_unbounded();
} else {
// For explain queries
perHostMemEstimate = planCtx.compilationState_.getEstimatedMemoryPerHost();
cpuAskBounded = planCtx.compilationState_.getCoresRequired();
cpuAskUnbounded = planCtx.compilationState_.getCoresRequiredUnbounded();
}
Preconditions.checkState(perHostMemEstimate >= 0);
boolean memReqSatisfied = perHostMemEstimate <= group_set.getMax_mem_limit();
addCounter(groupSetProfile,
new TCounter(MEMORY_ASK, TUnit.BYTES,
LongMath.saturatedMultiply(
expectedNumExecutor(group_set), perHostMemEstimate)));
boolean cpuReqSatisfied = true;
int scaledCpuAskBounded = -1;
int scaledCpuAskUnbounded = -1;
if (isComputeCost) {
Preconditions.checkState(cpuAskBounded > 0);
Preconditions.checkState(cpuAskUnbounded > 0);
if (queryOptions.getProcessing_cost_min_threads()
> queryOptions.getMax_fragment_instances_per_node()) {
throw new AnalysisException(
TImpalaQueryOptions.PROCESSING_COST_MIN_THREADS.name() + " ("
+ queryOptions.getProcessing_cost_min_threads()
+ ") can not be larger than "
+ TImpalaQueryOptions.MAX_FRAGMENT_INSTANCES_PER_NODE.name() + " ("
+ queryOptions.getMax_fragment_instances_per_node() + ").");
}
// Mark parallelism before scaling.
addCounter(
groupSetProfile, new TCounter(MAX_PARALLELISM, TUnit.UNIT, cpuAskUnbounded));
addCounter(groupSetProfile,
new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cpuAskBounded));
// Scale down cpuAskUnbounded using non-linear function, but cap it at least
// equal to cpuAskBounded at minimum.
cpuAskUnbounded = scaleDownCpuSublinear(cpuCountRootFactor,
firstExecutorGroupTotalCores, cpuAskBounded, cpuAskUnbounded);
// Do another scale down, but linearly using QUERY_CPU_COUNT_DIVISOR option.
scaledCpuAskBounded = scaleDownCpuLinear(cpuCountDivisor, cpuAskBounded);
scaledCpuAskUnbounded = scaleDownCpuLinear(cpuCountDivisor, cpuAskUnbounded);
// Check if cpu requirement match.
Preconditions.checkState(scaledCpuAskBounded <= scaledCpuAskUnbounded);
cpuReqSatisfied = scaledCpuAskUnbounded <= availableCores;
// Mark parallelism after scaling.
addCounter(
groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaledCpuAskUnbounded));
addCounter(groupSetProfile,
new TCounter(CPU_ASK_BOUNDED, TUnit.UNIT, scaledCpuAskBounded));
}
boolean matchFound = false;
if (clientSetRequestPool) {
if (!default_executor_group) {
Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
queryOptions.getRequest_pool()));
}
reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool()
+ " is set. Memory and cpu limit checking is skipped.";
addInfoString(groupSetProfile, VERDICT, reason);
matchFound = true;
} else if (memReqSatisfied && cpuReqSatisfied) {
reason = "suitable group found. "
+ MoreObjects.toStringHelper("requirement")
.add(MEMORY_ASK, PrintUtils.printBytes(perHostMemEstimate))
.add(CPU_ASK, scaledCpuAskUnbounded)
.add(CPU_ASK_BOUNDED, scaledCpuAskBounded)
.add(EFFECTIVE_PARALLELISM, cpuAskBounded)
.toString();
addInfoString(groupSetProfile, VERDICT, "Match");
matchFound = true;
}
if (!matchFound && skipResourceCheckingAtLastEG) {
reason = "no executor group set fit. Admit to last executor group set.";
addInfoString(groupSetProfile, VERDICT, reason);
matchFound = true;
}
// Append this exec group set profile node.
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
if (matchFound) {
setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
if (isComputeCost && req.query_exec_request != null
&& queryOptions.slot_count_strategy == TSlotCountStrategy.PLANNER_CPU_ASK) {
// Use 'cpuAskBounded' because it is the actual total number of fragment
// instances that will be distributed.
int avgSlotsUsePerBackend =
getAvgSlotsUsePerBackend(req, cpuAskBounded, group_set);
FrontendProfile.getCurrent().setToCounter(
AVG_ADMISSION_SLOTS_PER_EXECUTOR, TUnit.UNIT, avgSlotsUsePerBackend);
req.query_exec_request.setMax_slot_per_executor(
group_set.getNum_cores_per_executor());
}
break;
}
// At this point, no match is found and planner will step up to the next bigger
// executor group set.
List<String> verdicts = Lists.newArrayListWithCapacity(2);
List<String> reasons = Lists.newArrayListWithCapacity(2);
if (!memReqSatisfied) {
String verdict = "not enough per-host memory";
verdicts.add(verdict);
reasons.add(verdict + " (require=" + perHostMemEstimate
+ ", max=" + group_set.getMax_mem_limit() + ")");
}
if (!cpuReqSatisfied) {
String verdict = "not enough cpu cores";
verdicts.add(verdict);
reasons.add(verdict + " (require=" + scaledCpuAskUnbounded
+ ", max=" + availableCores + ")");
}
reason = String.join(", ", reasons);
addInfoString(groupSetProfile, VERDICT, String.join(", ", verdicts));
group_set = null;
// Restore to the captured state.
planCtx.compilationState_.restoreState();
FrontendProfile.getCurrent().addToCounter(
EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
// Clear profile children that will not be used.
FrontendProfile.getCurrent().clearStagedChildrenProfiles();
i++;
}
if (group_set == null) {
if (reason.equals("Unknown")) {
throw new AnalysisException("The query does not fit any executor group sets.");
} else {
throw new AnalysisException(
"The query does not fit largest executor group sets. Reason: " + reason
+ ".");
}
} else {
// This group_set is a match.
FrontendProfile.getCurrent().addToCounter(
EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
}
LOG.info("Selected executor group: " + group_set + ", reason: " + reason);
FrontendProfile.getCurrent().finalizeStagedChildrenProfiles();
// Transfer the profile access flag which is collected during 1st compilation.
req.setUser_has_profile_access(planCtx.compilationState_.userHasProfileAccess());
return req;
}