in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java [1489:1759]
private void getMetaData(HiveParserQB qb) throws HiveException {
LOG.info("Get metadata for source tables");
// Go over the tables and populate the related structures. We have to materialize the table
// alias list since we might
// modify it in the middle for view rewrite.
List<String> tabAliases = new ArrayList<>(qb.getTabAliases());
// Keep track of view alias to view name
// For eg: for a query like 'select * from V3', where V3 -> V2, V2 -> V1, V1 -> T
// keeps track of full view name corresponding to alias V3, V3:V2, V3:V2:V1.
Map<String, String> aliasToViewInfo = new HashMap<>();
// used to capture view to SQ conversions. This is used to check for recursive CTE
// invocations.
Map<String, String> sqAliasToCTEName = new HashMap<>();
for (String alias : tabAliases) {
// tabName will always be "catalog.db.table"
String tabName = qb.getTabNameForAlias(alias);
ObjectIdentifier tableIdentifier = parseCompoundName(catalogRegistry, tabName);
// get the origin table name like "table", "db.table", "catalog.db.table" that user
// specifies
String originTabName = qb.getOriginTabNameForAlias(alias);
String cteName = originTabName.toLowerCase();
CatalogBaseTable tab = getCatalogBaseTable(tabName, qb, false);
if (tab == null
|| tableIdentifier
.getDatabaseName()
.equals(catalogRegistry.getCurrentDatabase())) {
// we first look for this alias from CTE, and then from catalog.
HiveParserBaseSemanticAnalyzer.CTEClause cte = findCTEFromName(qb, cteName);
if (cte != null) {
if (!cte.materialize) {
addCTEAsSubQuery(qb, cteName, alias);
sqAliasToCTEName.put(alias, cteName);
continue;
}
throw new SemanticException("Materializing CTE is not supported at the moment");
}
}
if (tab == null) {
HiveParserASTNode src = qb.getParseInfo().getSrcForAlias(alias);
if (null != src) {
throw new SemanticException(
HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_TABLE, src));
} else {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias));
}
}
if (tab instanceof CatalogView) {
if (qb.getParseInfo().isAnalyzeCommand()) {
throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
}
// Prevent view cycles
if (viewsExpanded.contains(tabName)) {
throw new SemanticException(
"Recursive view "
+ tabName
+ " detected (cycle: "
+ StringUtils.join(viewsExpanded, " -> ")
+ " -> "
+ tabName
+ ").");
}
replaceViewReferenceWithDefinition(qb, (CatalogView) tab, tabName, alias);
aliasToViewInfo.put(alias, tabName);
continue;
}
qb.getMetaData().setSrcForAlias(alias, tabName, (CatalogTable) tab);
}
LOG.info("Get metadata for subqueries");
// Go over the subqueries and getMetaData for these
for (String alias : qb.getSubqAliases()) {
boolean wasView = aliasToViewInfo.containsKey(alias);
boolean wasCTE = sqAliasToCTEName.containsKey(alias);
if (wasView) {
viewsExpanded.add(aliasToViewInfo.get(alias));
} else if (wasCTE) {
ctesExpanded.add(sqAliasToCTEName.get(alias));
}
HiveParserQBExpr qbexpr = qb.getSubqForAlias(alias);
getMetaData(qbexpr);
if (wasView) {
viewsExpanded.remove(viewsExpanded.size() - 1);
} else if (wasCTE) {
ctesExpanded.remove(ctesExpanded.size() - 1);
}
}
HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams =
new HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams();
HiveParserStorageFormat storageFormat = new HiveParserStorageFormat(conf);
LOG.info("Get metadata for destination tables");
// Go over all the destination structures and populate the related metadata
HiveParserQBParseInfo qbp = qb.getParseInfo();
for (String name : qbp.getClauseNamesForDest()) {
HiveParserASTNode ast = qbp.getDestForClause(name);
switch (ast.getToken().getType()) {
case HiveASTParser.TOK_TAB:
{
TableSpec ts =
new TableSpec(catalogRegistry, conf, ast, frameworkConfig, cluster);
if (ts.table instanceof CatalogView) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
boolean isTableWrittenTo =
qb.getParseInfo()
.isInsertIntoTable(ts.tableIdentifier.asSummaryString());
isTableWrittenTo |=
(qb.getParseInfo()
.getInsertOverwriteTables()
.get(
getUnescapedName(
(HiveParserASTNode) ast.getChild(0),
ts.tableIdentifier.getCatalogName(),
ts.tableIdentifier
.getDatabaseName()))
!= null);
assert isTableWrittenTo
: "Inconsistent data structure detected: we are writing to "
+ ts.tableIdentifier.asSummaryString()
+ " in "
+ name
+ " but it's not in isInsertIntoTable() or getInsertOverwriteTables()";
// TableSpec ts is got from the query (user specified),
// which means the user didn't specify partitions in their query,
// but whether the table itself is partitioned is not know.
if (ts.specType != SpecType.STATIC_PARTITION) {
// This is a table or dynamic partition
qb.getMetaData()
.setDestForAlias(
name,
ts.tableIdentifier.asSummaryString(),
(CatalogTable) ts.table);
// has dynamic as well as static partitions
if (ts.partSpec != null && ts.partSpec.size() > 0) {
qb.getMetaData().setPartSpecForAlias(name, ts.partSpec);
}
} else {
// rewrite QBMetaData
// This is a partition
qb.getMetaData()
.setDestForAlias(
name,
ts.tableIdentifier.asSummaryString(),
(CatalogTable) ts.table,
ts.partHandle);
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
// Add the table spec for the destination table.
qb.getParseInfo()
.addTableSpec(
ts.tableIdentifier.asSummaryString().toLowerCase(), ts);
}
break;
}
case HiveASTParser.TOK_DIR:
{
// This is a dfs file
String fname = stripQuotes(ast.getChild(0).getText());
if ((!qb.getParseInfo().getIsSubQ())
&& (((HiveParserASTNode) ast.getChild(0)).getToken().getType()
== HiveASTParser.TOK_TMP_FILE)) {
if (qb.isCTAS() || qb.isMaterializedView()) {
qb.setIsQuery(false);
Path location;
// If the CTAS query does specify a location, use the table
// location, else use the db location
if (qb.getTableDesc() != null
&& qb.getTableDesc().getLocation() != null) {
location = new Path(qb.getTableDesc().getLocation());
} else {
// allocate a temporary output dir on the location of the table
String tableName =
getUnescapedName((HiveParserASTNode) ast.getChild(0));
String[] names = Utilities.getDbTableName(tableName);
try {
Warehouse wh = new Warehouse(conf);
// Use destination table's db location.
String destTableDb =
qb.getTableDesc() != null
? qb.getTableDesc().getDatabaseName()
: null;
if (destTableDb == null) {
destTableDb = names[0];
}
location = wh.getDatabasePath(db.getDatabase(destTableDb));
} catch (MetaException e) {
throw new SemanticException(e);
}
}
if (HiveConf.getBoolVar(
conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
TableSpec ts =
new TableSpec(
catalogRegistry,
conf,
this.ast,
frameworkConfig,
cluster);
// Add the table spec for the destination table.
qb.getParseInfo()
.addTableSpec(
ts.tableIdentifier
.asSummaryString()
.toLowerCase(),
ts);
}
} else {
// This is the only place where isQuery is set to true; it defaults
// to false.
qb.setIsQuery(true);
}
}
boolean isDfsFile =
ast.getChildCount() < 2
|| !ast.getChild(1).getText().equalsIgnoreCase("local");
// Set the destination for the SELECT query inside the CTAS
qb.getMetaData().setDestForAlias(name, fname, isDfsFile);
// we use a dedicated class to represent for 'insert overwrite directory'
HiveParserDirectoryDesc directoryDesc =
new HiveParserDirectoryDesc(rowFormatParams, storageFormat);
int numCh = ast.getChildCount();
for (int num = 1; num < numCh; num++) {
HiveParserASTNode child = (HiveParserASTNode) ast.getChild(num);
if (child != null) {
if (storageFormat.fillStorageFormat(child)) {
continue;
}
switch (child.getToken().getType()) {
case HiveASTParser.TOK_TABLEROWFORMAT:
rowFormatParams.analyzeRowFormat(child);
break;
case HiveASTParser.TOK_TABLESERIALIZER:
HiveParserASTNode serdeChild =
(HiveParserASTNode) child.getChild(0);
storageFormat.setSerde(
unescapeSQLString(
serdeChild.getChild(0).getText()));
if (serdeChild.getChildCount() > 1) {
readProps(
(HiveParserASTNode)
serdeChild.getChild(1).getChild(0),
storageFormat.getSerdeProps());
}
break;
}
}
}
qb.setDirectoryDesc(directoryDesc);
break;
}
default:
throw new SemanticException(
HiveParserUtils.generateErrorMessage(
ast, "Unknown Token Type " + ast.getToken().getType()));
}
}
}