in fe/src/main/java/org/apache/impala/service/Frontend.java [2884:3165]
private TExecRequest doCreateExecRequest(CompilerFactory compilerFactory,
PlanCtx planCtx, List<String> warnings, EventSequence timeline)
throws ImpalaException {
TQueryCtx queryCtx = planCtx.getQueryContext();
// Parse stmt and collect/load metadata to populate a stmt-local table cache
ParsedStatement parsedStmt = compilerFactory.createParsedStatement(queryCtx);
User user = new User(TSessionStateUtil.getEffectiveUser(queryCtx.session));
StmtMetadataLoader metadataLoader = new StmtMetadataLoader(
this, queryCtx.session.database, timeline, user, queryCtx.getQuery_id());
//TODO (IMPALA-8788): should load table write ids in transaction context.
StmtTableCache stmtTableCache = metadataLoader.loadTables(parsedStmt);
if (queryCtx.client_request.query_options.isSetDebug_action()) {
DebugUtils.executeDebugAction(
queryCtx.client_request.query_options.getDebug_action(),
DebugUtils.LOAD_TABLES_DELAY);
}
// Add referenced tables to frontend profile
FrontendProfile.getCurrent().addInfoString("Referenced Tables",
stmtTableCache.tables.keySet()
.stream()
.map(TableName::toString)
.collect(Collectors.joining(", ")));
// Add the catalog versions and loaded timestamps.
FrontendProfile.getCurrent().addInfoString("Original Table Versions",
stmtTableCache.tables.values().stream()
.map(t -> String.join(", ", t.getFullName(),
Long.toString(t.getCatalogVersion()),
Long.toString(t.getLastLoadedTimeMs()),
new Date(t.getLastLoadedTimeMs()).toString()))
.collect(Collectors.joining("\n")));
// Analyze and authorize stmt
AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline);
AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory,
parsedStmt, stmtTableCache, authzChecker_.get(),
planCtx.compilationState_.disableAuthorization());
// need to re-fetch the parsedStatement because analysisResult can rewrite the
// statement.
parsedStmt = analysisResult.getParsedStmt();
Preconditions.checkState(analysisResult.getException() == null);
if (!planCtx.compilationState_.disableAuthorization()) {
LOG.info("Analysis and authorization finished.");
planCtx.compilationState_.setUserHasProfileAccess(
analysisResult.userHasProfileAccess());
} else {
LOG.info("Analysis finished.");
}
analysisResult.getAnalyzer().addWarnings(warnings);
TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
for (TableName table : stmtTableCache.tables.keySet()) {
result.addToTables(table.toThrift());
}
for (String column : analysisResult.getAnalyzer().selectColumns()) {
result.addToSelect_columns(column);
}
for (String column : analysisResult.getAnalyzer().whereColumns()) {
result.addToWhere_columns(column);
}
for (String column : analysisResult.getAnalyzer().aggregateColumns()) {
result.addToAggregate_columns(column);
}
for (String column : analysisResult.getAnalyzer().orderByColumns()) {
result.addToOrderby_columns(column);
}
// Transfer the expected number of executors in executor group set to
// analyzer's global state. The info is needed to compute the number of nodes to be
// used during planner phase for scans (see HdfsScanNode.computeNumNodes()).
analysisResult.getAnalyzer().setNumExecutorsForPlanning(
expectedNumExecutor(planCtx.compilationState_.getGroupSet()));
analysisResult.getAnalyzer().setAvailableCoresPerNode(
Math.max(1, planCtx.compilationState_.getAvailableCoresPerNode()));
analysisResult.getAnalyzer().setPoolMemLimit(planCtx.compilationState_.getGroupSet());
try {
TQueryOptions queryOptions = queryCtx.client_request.query_options;
if (analysisResult.isCatalogOp()) {
result.stmt_type = TStmtType.DDL;
createCatalogOpRequest(analysisResult, result);
TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
result.catalog_op_request.setLineage_graph(thriftLineageGraph);
}
setMtDopForCatalogOp(analysisResult, queryOptions);
// All DDL operations except for CTAS are done with analysis at this point.
if (!analysisResult.isCreateTableAsSelectStmt()) {
return result;
}
}
if (!analysisResult.isExplainStmt() &&
(analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) {
InsertStmt insertStmt = analysisResult.getInsertStmt();
FeTable targetTable = insertStmt.getTargetTable();
if (AcidUtils.isTransactionalTable(targetTable)) {
if (planCtx.compilationState_.getWriteId() == -1) {
// 1st time compilation. Open a transaction and save the writeId.
long txnId = openTransaction(queryCtx);
timeline.markEvent("Transaction opened (" + String.valueOf(txnId) + ")");
Collection<FeTable> tables = stmtTableCache.tables.values();
String staticPartitionTarget = null;
if (insertStmt.isStaticPartitionTarget()) {
staticPartitionTarget = FeCatalogUtils.getPartitionName(
insertStmt.getPartitionKeyValues());
}
long writeId = allocateWriteId(queryCtx, targetTable);
insertStmt.setWriteId(writeId);
createLockForInsert(txnId, tables, targetTable, insertStmt.isOverwrite(),
staticPartitionTarget, queryOptions);
planCtx.compilationState_.setWriteId(writeId);
} else {
// Continue the transaction by reusing the writeId.
insertStmt.setWriteId(planCtx.compilationState_.getWriteId());
}
}
} else if (analysisResult.isLoadDataStmt()) {
result.stmt_type = TStmtType.LOAD;
result.setResult_set_metadata(new TResultSetMetadata(
Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
return result;
} else if (analysisResult.isSetStmt()) {
result.stmt_type = TStmtType.SET;
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
new TColumn("option", Type.STRING.toThrift()),
new TColumn("value", Type.STRING.toThrift()),
new TColumn("level", Type.STRING.toThrift()))));
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
return result;
} else if (analysisResult.isAdminFnStmt()) {
result.stmt_type = TStmtType.ADMIN_FN;
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
new TColumn("summary", Type.STRING.toThrift()))));
result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
return result;
} else if (analysisResult.isConvertTableToIcebergStmt()) {
result.stmt_type = TStmtType.CONVERT;
result.setResult_set_metadata(new TResultSetMetadata(
Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
result.setConvert_table_request(
analysisResult.getConvertTableToIcebergStmt().toThrift());
return result;
} else if (analysisResult.isTestCaseStmt()) {
CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) analysisResult.getStmt());
if (testCaseStmt.isTestCaseExport()) {
result.setStmt_type(TStmtType.TESTCASE);
result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
new TColumn("Test case data output path", Type.STRING.toThrift()))));
result.setTestcase_data_path(testCaseStmt.writeTestCaseData());
} else {
// Mimic it as a DDL.
result.setStmt_type(TStmtType.DDL);
createCatalogOpRequest(analysisResult, result);
}
return result;
} else if (analysisResult.isKillQueryStmt()) {
result.stmt_type = TStmtType.KILL;
result.setResult_set_metadata(new TResultSetMetadata(
Collections.singletonList(new TColumn("result", Type.STRING.toThrift()))));
result.setKill_query_request(analysisResult.getKillQueryStmt().toThrift());
return result;
}
// Open or continue Kudu transaction if Kudu transaction is enabled and target table
// is Kudu table.
if (!analysisResult.isExplainStmt() && queryOptions.isEnable_kudu_transaction()) {
if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
&& analysisResult.getInsertStmt().isTargetTableKuduTable()) {
// For INSERT/UPSERT/CTAS statements.
openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
analysisResult.getInsertStmt().getTargetTable(), timeline);
} else if (analysisResult.isUpdateStmt()
&& analysisResult.getUpdateStmt().isTargetTableKuduTable()) {
// For UPDATE statement.
openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
analysisResult.getUpdateStmt().getTargetTable(), timeline);
} else if (analysisResult.isDeleteStmt()
&& analysisResult.getDeleteStmt().isTargetTableKuduTable()) {
// For DELETE statement.
openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
analysisResult.getDeleteStmt().getTargetTable(), timeline);
}
}
// If unset, set MT_DOP to 0 to simplify the rest of the code.
if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
Planner planner = new Planner(compilerFactory, analysisResult, queryCtx, timeline);
// create TQueryExecRequest
TQueryExecRequest queryExecRequest =
getPlannedExecRequest(planner, queryCtx, planCtx, analysisResult, timeline);
for (String column : analysisResult.getAnalyzer().joinColumns()) {
result.addToJoin_columns(column);
}
TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
queryExecRequest.setLineage_graph(thriftLineageGraph);
}
// Override the per_host_mem_estimate sent to the backend if needed. The explain
// string is already generated at this point so this does not change the estimate
// shown in the plan.
checkAndOverrideMemEstimate(queryExecRequest, queryOptions);
if (analysisResult.isExplainStmt()) {
// Return the EXPLAIN request
createExplainRequest(planCtx.getExplainString(), result);
return result;
}
result.setQuery_exec_request(queryExecRequest);
if (analysisResult.isQueryStmt()) {
result.stmt_type = TStmtType.QUERY;
result.query_exec_request.stmt_type = result.stmt_type;
// use the parsed statement from the analysis result because the
// rewriter may have changed the statement.
ParsedStatement analyzedStmt = analysisResult.getParsedStmt();
// fill in the metadata
result.setResult_set_metadata(
planner.getTResultSetMetadata(analyzedStmt));
} else if (analysisResult.isInsertStmt() ||
analysisResult.isCreateTableAsSelectStmt()) {
// For CTAS the overall TExecRequest statement type is DDL, but the
// query_exec_request should be DML
result.stmt_type =
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
result.query_exec_request.stmt_type = TStmtType.DML;
// create finalization params of insert stmt
addFinalizationParamsForInsert(
queryCtx, queryExecRequest, analysisResult.getInsertStmt());
} else {
Preconditions.checkState(
analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt() ||
analysisResult.isOptimizeStmt() || analysisResult.isMergeStmt());
result.stmt_type = TStmtType.DML;
result.query_exec_request.stmt_type = TStmtType.DML;
if (analysisResult.isDeleteStmt()) {
addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
analysisResult.getDeleteStmt(), TIcebergOperation.DELETE);
} else if (analysisResult.isUpdateStmt()) {
addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
analysisResult.getUpdateStmt(), TIcebergOperation.UPDATE);
} else if (analysisResult.isOptimizeStmt()) {
addFinalizationParamsForIcebergOptimize(queryCtx, queryExecRequest,
analysisResult.getOptimizeStmt());
} else if (analysisResult.isMergeStmt()) {
addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
analysisResult.getMergeStmt(), TIcebergOperation.MERGE);
}
}
return result;
} catch (Exception e) {
if (queryCtx.isSetTransaction_id()) {
try {
planCtx.compilationState_.setWriteId(-1);
abortTransaction(queryCtx.getTransaction_id());
timeline.markEvent("Transaction aborted");
} catch (TransactionException te) {
LOG.error("Could not abort transaction because: " + te.getMessage());
}
} else if (queryCtx.isIs_kudu_transactional()) {
try {
planCtx.compilationState_.setKuduTransactionToken(null);
abortKuduTransaction(queryCtx.getQuery_id());
timeline.markEvent(
"Kudu transaction aborted: " + PrintId(queryCtx.getQuery_id()));
} catch (TransactionException te) {
LOG.error("Could not abort transaction because: " + te.getMessage());
}
}
throw e;
}
}