in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserScriptTransformHelper.java [85:293]
public RelNode genScriptPlan(
HiveParserASTNode trfm, HiveParserQB qb, List<RexNode> operands, RelNode input)
throws SemanticException {
// if there's any RexNode is not RexInputRef,
// create a project node before create the script transform node
boolean isAllRexRef = operands.stream().allMatch(node -> node instanceof RexInputRef);
int[] transformFieldIndices;
if (!isAllRexRef) {
input =
LogicalProject.create(
input, Collections.emptyList(), operands, (List<String>) null);
transformFieldIndices = IntStream.range(0, operands.size()).toArray();
HiveParserRowResolver rowResolver = new HiveParserRowResolver();
// record the column info for the project node
for (int i = 0; i < operands.size(); i++) {
ColumnInfo oColInfo =
new ColumnInfo(
getColumnInternalName(i),
HiveParserTypeConverter.convert(operands.get(i).getType()),
null,
false);
rowResolver.put(null, getColumnInternalName(i), oColInfo);
}
relToRowResolver.put(input, rowResolver);
} else {
transformFieldIndices =
operands.stream()
.flatMapToInt(node -> IntStream.of(((RexInputRef) node).getIndex()))
.toArray();
}
ArrayList<ColumnInfo> inputSchema = relToRowResolver.get(input).getColumnInfos();
// If there is no "AS" clause, the output schema will be "key,value"
ArrayList<ColumnInfo> outputCols = new ArrayList<>();
int inputSerDeNum = 1, inputRecordWriterNum = 2;
int outputSerDeNum = 4, outputRecordReaderNum = 5;
int outputColsNum = 6;
boolean outputColNames = false, outputColSchemas = false;
int execPos = 3;
boolean defaultOutputCols = false;
// Go over all the children
if (trfm.getChildCount() > outputColsNum) {
HiveParserASTNode outCols = (HiveParserASTNode) trfm.getChild(outputColsNum);
if (outCols.getType() == HiveASTParser.TOK_ALIASLIST) {
outputColNames = true;
} else if (outCols.getType() == HiveASTParser.TOK_TABCOLLIST) {
outputColSchemas = true;
}
}
// If column type is not specified, use a string
if (!outputColNames && !outputColSchemas) {
// output schema will be "key, value"
String[] outputAlias = new String[] {"key", "value"};
for (int i = 0; i < outputAlias.length; i++) {
String intName = getColumnInternalName(i);
ColumnInfo colInfo =
new ColumnInfo(intName, TypeInfoFactory.stringTypeInfo, null, false);
colInfo.setAlias(outputAlias[i]);
outputCols.add(colInfo);
}
defaultOutputCols = true;
} else {
// column name or type is specified
HiveParserASTNode collist = (HiveParserASTNode) trfm.getChild(outputColsNum);
int ccount = collist.getChildCount();
Set<String> colAliasNamesDuplicateCheck = new HashSet<>();
for (int i = 0; i < ccount; i++) {
ColumnInfo colInfo =
getColumnInfoInScriptTransform(
(HiveParserASTNode) collist.getChild(i),
outputColSchemas,
i,
colAliasNamesDuplicateCheck);
outputCols.add(colInfo);
}
}
// input schema info
StringBuilder inpColumns = new StringBuilder();
StringBuilder inpColumnTypes = new StringBuilder();
for (int i = 0; i < transformFieldIndices.length; i++) {
if (i != 0) {
inpColumns.append(",");
inpColumnTypes.append(",");
}
inpColumns.append(inputSchema.get(transformFieldIndices[i]).getInternalName());
inpColumnTypes.append(
inputSchema.get(transformFieldIndices[i]).getType().getTypeName());
}
// output schema info
StringBuilder outColumns = new StringBuilder();
StringBuilder outColumnTypes = new StringBuilder();
List<RelDataType> outDataTypes = new ArrayList<>();
List<String> outColNames = new ArrayList<>();
HiveParserRowResolver scriptRR = new HiveParserRowResolver();
RelDataTypeFactory dtFactory = cluster.getRexBuilder().getTypeFactory();
for (int i = 0; i < outputCols.size(); i++) {
if (i != 0) {
outColumns.append(",");
outColumnTypes.append(",");
}
outColumns.append(outputCols.get(i).getInternalName());
outColumnTypes.append(outputCols.get(i).getType().getTypeName());
scriptRR.put(
qb.getParseInfo().getAlias(), outputCols.get(i).getAlias(), outputCols.get(i));
outDataTypes.add(HiveParserUtils.toRelDataType(outputCols.get(i).getType(), dtFactory));
outColNames.add(outputCols.get(i).getInternalName());
}
String serdeName = LazySimpleSerDe.class.getName();
int fieldSeparator = Utilities.tabCode;
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) {
fieldSeparator = Utilities.ctrlaCode;
}
// Input and Output Serdes
HiveParserBaseSemanticAnalyzer.SerDeClassProps inSerDeClassProps;
if (trfm.getChild(inputSerDeNum).getChildCount() > 0) {
// use user specified serialize class and properties
HiveParserASTNode inputSerDeNode = (HiveParserASTNode) trfm.getChild(inputSerDeNum);
inSerDeClassProps =
HiveParserBaseSemanticAnalyzer.SerDeClassProps.analyzeSerDeInfo(
(HiveParserASTNode) inputSerDeNode.getChild(0),
inpColumns.toString(),
inpColumnTypes.toString(),
false);
} else {
// use default serialize class and properties
Map<String, String> inSerdeProps =
HiveParserBaseSemanticAnalyzer.SerDeClassProps.getDefaultSerDeProps(
serdeName,
String.valueOf(fieldSeparator),
inpColumns.toString(),
inpColumnTypes.toString(),
false,
true);
inSerDeClassProps =
new HiveParserBaseSemanticAnalyzer.SerDeClassProps(serdeName, inSerdeProps);
}
HiveParserBaseSemanticAnalyzer.SerDeClassProps outSerDeClassProps;
if (trfm.getChild(outputSerDeNum).getChildCount() > 0) {
// use user specified deserialize class and properties
HiveParserASTNode outSerDeNode = (HiveParserASTNode) trfm.getChild(outputSerDeNum);
outSerDeClassProps =
HiveParserBaseSemanticAnalyzer.SerDeClassProps.analyzeSerDeInfo(
(HiveParserASTNode) outSerDeNode.getChild(0),
outColumns.toString(),
outColumnTypes.toString(),
false);
} else {
// use default deserialize class and properties
Map<String, String> outSerdeProps =
HiveParserBaseSemanticAnalyzer.SerDeClassProps.getDefaultSerDeProps(
serdeName,
String.valueOf(fieldSeparator),
outColumns.toString(),
outColumnTypes.toString(),
defaultOutputCols,
true);
outSerDeClassProps =
new HiveParserBaseSemanticAnalyzer.SerDeClassProps(serdeName, outSerdeProps);
}
// script input record writer
Tree recordWriterASTNode = trfm.getChild(inputRecordWriterNum);
String inRecordWriter =
recordWriterASTNode.getChildCount() == 0
? TextRecordWriter.class.getName()
: unescapeSQLString(recordWriterASTNode.getChild(0).getText());
// script output record readers
Tree recordReaderASTNode = trfm.getChild(outputRecordReaderNum);
String outRecordReader =
recordReaderASTNode.getChildCount() == 0
? TextRecordReader.class.getName()
: unescapeSQLString(recordReaderASTNode.getChild(0).getText());
RelDataType rowDataType = dtFactory.createStructType(outDataTypes, outColNames);
String script = unescapeSQLString(trfm.getChild(execPos).getText());
ScriptTransformIOInfo inputOutSchema =
new ScriptTransformIOInfo(
inSerDeClassProps.getSerdeClassName(),
inSerDeClassProps.getProperties(),
outSerDeClassProps.getSerdeClassName(),
outSerDeClassProps.getProperties(),
inRecordWriter,
outRecordReader,
new JobConfWrapper(new JobConf(hiveConf)));
LogicalScriptTransform scriptTransform =
LogicalScriptTransform.create(
input, transformFieldIndices, script, inputOutSchema, rowDataType);
relToHiveColNameCalcitePosMap.put(scriptTransform, buildHiveToCalciteColumnMap(scriptRR));
relToRowResolver.put(scriptTransform, scriptRR);
// todo
// Add URI entity for transform script. script assumed t be local unless downloadable
return scriptTransform;
}