in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java [1815:2064]
public Integer genColListRegex(
String colRegex,
String tabAlias,
HiveParserASTNode sel,
ArrayList<ExprNodeDesc> colList,
HashSet<ColumnInfo> excludeCols,
HiveParserRowResolver input,
HiveParserRowResolver colSrcRR,
Integer pos,
HiveParserRowResolver output,
List<String> aliases,
boolean ensureUniqueCols)
throws SemanticException {
if (colSrcRR == null) {
colSrcRR = input;
}
// The table alias should exist
if (tabAlias != null && !colSrcRR.hasTableAlias(tabAlias)) {
throw new SemanticException(
HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_TABLE_ALIAS, sel));
}
// TODO: Have to put in the support for AS clause
Pattern regex;
try {
regex = Pattern.compile(colRegex, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
throw new SemanticException(
HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_COLUMN, sel, e.getMessage()));
}
StringBuilder replacementText = new StringBuilder();
int matched = 0;
// add empty string to the list of aliases. Some operators (ex. GroupBy) add
// ColumnInfos for table alias "".
if (!aliases.contains("")) {
aliases.add("");
}
/*
* track the input ColumnInfos that are added to the output.
* if a columnInfo has multiple mappings; then add the column only once,
* but carry the mappings forward.
*/
Map<ColumnInfo, ColumnInfo> inputColsProcessed = new HashMap<>();
// For expr "*", aliases should be iterated in the order they are specified in the query.
if (colSrcRR.getNamedJoinInfo() != null) {
// We got using() clause in previous join. Need to generate select list as
// per standard. For * we will have joining columns first non-repeated
// followed by other columns.
HashMap<String, ColumnInfo> leftMap =
colSrcRR.getFieldMap(colSrcRR.getNamedJoinInfo().getAliases().get(0));
HashMap<String, ColumnInfo> rightMap =
colSrcRR.getFieldMap(colSrcRR.getNamedJoinInfo().getAliases().get(1));
HashMap<String, ColumnInfo> chosenMap = null;
if (colSrcRR.getNamedJoinInfo().getHiveJoinType() != JoinType.RIGHTOUTER) {
chosenMap = leftMap;
} else {
chosenMap = rightMap;
}
// first get the columns in named columns
for (String columnName : colSrcRR.getNamedJoinInfo().getNamedColumns()) {
for (Map.Entry<String, ColumnInfo> entry : chosenMap.entrySet()) {
ColumnInfo colInfo = entry.getValue();
if (!columnName.equals(colInfo.getAlias())) {
continue;
}
String name = colInfo.getInternalName();
String[] tmp = colSrcRR.reverseLookup(name);
// Skip the colinfos which are not for this particular alias
if (tabAlias != null && !tmp[0].equalsIgnoreCase(tabAlias)) {
continue;
}
if (colInfo.getIsVirtualCol() && colInfo.isHiddenVirtualCol()) {
continue;
}
ColumnInfo oColInfo = inputColsProcessed.get(colInfo);
if (oColInfo == null) {
ExprNodeColumnDesc expr =
new ExprNodeColumnDesc(
colInfo.getType(),
name,
colInfo.getTabAlias(),
colInfo.getIsVirtualCol(),
colInfo.isSkewedCol());
colList.add(expr);
oColInfo =
new ColumnInfo(
getColumnInternalName(pos),
colInfo.getType(),
colInfo.getTabAlias(),
colInfo.getIsVirtualCol(),
colInfo.isHiddenVirtualCol());
inputColsProcessed.put(colInfo, oColInfo);
}
if (ensureUniqueCols) {
if (!output.putWithCheck(tmp[0], tmp[1], null, oColInfo)) {
throw new SemanticException(
"Cannot add column to RR: "
+ tmp[0]
+ "."
+ tmp[1]
+ " => "
+ oColInfo
+ " due to duplication, see previous warnings");
}
} else {
output.put(tmp[0], tmp[1], oColInfo);
}
pos = pos + 1;
matched++;
if (unparseTranslator.isEnabled()) {
if (replacementText.length() > 0) {
replacementText.append(", ");
}
replacementText.append(HiveUtils.unparseIdentifier(tmp[0], conf));
replacementText.append(".");
replacementText.append(HiveUtils.unparseIdentifier(tmp[1], conf));
}
}
}
}
for (String alias : aliases) {
HashMap<String, ColumnInfo> fMap = colSrcRR.getFieldMap(alias);
if (fMap == null) {
continue;
}
// For the tab.* case, add all the columns to the fieldList from the input schema
for (Map.Entry<String, ColumnInfo> entry : fMap.entrySet()) {
ColumnInfo colInfo = entry.getValue();
if (colSrcRR.getNamedJoinInfo() != null
&& colSrcRR.getNamedJoinInfo()
.getNamedColumns()
.contains(colInfo.getAlias())) {
// we already added this column in select list.
continue;
}
if (excludeCols != null && excludeCols.contains(colInfo)) {
continue; // This was added during plan generation.
}
// First, look up the column from the source against which * is to be resolved.
// We'd later translated this into the column from proper input, if it's valid.
// TODO: excludeCols may be possible to remove using the same technique.
String name = colInfo.getInternalName();
String[] tmp = colSrcRR.reverseLookup(name);
// Skip the colinfos which are not for this particular alias
if (tabAlias != null && !tmp[0].equalsIgnoreCase(tabAlias)) {
continue;
}
if (colInfo.getIsVirtualCol() && colInfo.isHiddenVirtualCol()) {
continue;
}
// Not matching the regex?
if (!regex.matcher(tmp[1]).matches()) {
continue;
}
// If input (GBY) is different than the source of columns, find the
// same column in input.
// TODO: This is fraught with peril.
if (input != colSrcRR) {
colInfo = input.get(tabAlias, tmp[1]);
if (colInfo == null) {
LOG.error(
"Cannot find colInfo for "
+ tabAlias
+ "."
+ tmp[1]
+ ", derived from ["
+ colSrcRR
+ "], in ["
+ input
+ "]");
throw new SemanticException(ErrorMsg.NON_KEY_EXPR_IN_GROUPBY, tmp[1]);
}
String oldCol = null;
if (LOG.isDebugEnabled()) {
oldCol = name + " => " + (tmp == null ? "null" : (tmp[0] + "." + tmp[1]));
}
name = colInfo.getInternalName();
tmp = input.reverseLookup(name);
if (LOG.isDebugEnabled()) {
String newCol =
name + " => " + (tmp == null ? "null" : (tmp[0] + "." + tmp[1]));
LOG.debug("Translated [" + oldCol + "] to [" + newCol + "]");
}
}
ColumnInfo oColInfo = inputColsProcessed.get(colInfo);
if (oColInfo == null) {
ExprNodeColumnDesc expr =
new ExprNodeColumnDesc(
colInfo.getType(),
name,
colInfo.getTabAlias(),
colInfo.getIsVirtualCol(),
colInfo.isSkewedCol());
colList.add(expr);
oColInfo =
new ColumnInfo(
getColumnInternalName(pos),
colInfo.getType(),
colInfo.getTabAlias(),
colInfo.getIsVirtualCol(),
colInfo.isHiddenVirtualCol());
inputColsProcessed.put(colInfo, oColInfo);
}
if (ensureUniqueCols) {
if (!output.putWithCheck(tmp[0], tmp[1], null, oColInfo)) {
throw new SemanticException(
"Cannot add column to RR: "
+ tmp[0]
+ "."
+ tmp[1]
+ " => "
+ oColInfo
+ " due to duplication, see previous warnings");
}
} else {
output.put(tmp[0], tmp[1], oColInfo);
}
pos++;
matched++;
if (unparseTranslator.isEnabled()) {
if (replacementText.length() > 0) {
replacementText.append(", ");
}
replacementText.append(HiveUtils.unparseIdentifier(tmp[0], conf));
replacementText.append(".");
replacementText.append(HiveUtils.unparseIdentifier(tmp[1], conf));
}
}
}
if (matched == 0) {
throw new SemanticException(HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_COLUMN, sel));
}
if (unparseTranslator.isEnabled()) {
unparseTranslator.addTranslation(sel, replacementText.toString());
}
return pos;
}