in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java [1806:1909]
public static RelNode genValues(
String tabAlias,
ResolvedCatalogTable catalogTable,
HiveParserRowResolver rowResolver,
RelOptCluster cluster,
List<List<String>> values) {
List<TypeInfo> tmpTableTypes = new ArrayList<>();
DataType[] dataTypes =
catalogTable.getResolvedSchema().getColumnDataTypes().toArray(new DataType[0]);
for (DataType dataType : dataTypes) {
tmpTableTypes.add(HiveTypeUtil.toHiveTypeInfo(dataType, false));
}
RexBuilder rexBuilder = cluster.getRexBuilder();
// calcite types for each field
List<RelDataType> calciteTargetTypes =
tmpTableTypes.stream()
.map(
ti ->
HiveParserTypeConverter.convert(
(PrimitiveTypeInfo) ti,
rexBuilder.getTypeFactory()))
.collect(Collectors.toList());
// calcite field names
List<String> calciteFieldNames =
IntStream.range(0, calciteTargetTypes.size())
.mapToObj(SqlUtil::deriveAliasFromOrdinal)
.collect(Collectors.toList());
// calcite type for each row
List<RelDataType> calciteRowTypes = new ArrayList<>();
List<List<RexLiteral>> rows = new ArrayList<>();
for (List<String> value : values) {
Preconditions.checkArgument(
value.size() == tmpTableTypes.size(),
String.format(
"Values table col length (%d) and data length (%d) mismatch",
tmpTableTypes.size(), value.size()));
List<RexLiteral> row = new ArrayList<>();
for (int i = 0; i < tmpTableTypes.size(); i++) {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) tmpTableTypes.get(i);
RelDataType calciteType = calciteTargetTypes.get(i);
String col = value.get(i);
if (col == null) {
row.add(rexBuilder.makeNullLiteral(calciteType));
} else {
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
row.add(rexBuilder.makeExactLiteral(new BigDecimal(col), calciteType));
break;
case DECIMAL:
BigDecimal bigDec = new BigDecimal(col);
row.add(
SqlTypeUtil.isValidDecimalValue(bigDec, calciteType)
? rexBuilder.makeExactLiteral(bigDec, calciteType)
: rexBuilder.makeNullLiteral(calciteType));
break;
case FLOAT:
case DOUBLE:
row.add(rexBuilder.makeApproxLiteral(new BigDecimal(col), calciteType));
break;
case BOOLEAN:
row.add(rexBuilder.makeLiteral(Boolean.parseBoolean(col)));
break;
default:
row.add(
rexBuilder.makeCharLiteral(
HiveParserUtils.asUnicodeString(col)));
}
}
}
calciteRowTypes.add(
rexBuilder
.getTypeFactory()
.createStructType(
row.stream()
.map(RexLiteral::getType)
.collect(Collectors.toList()),
calciteFieldNames));
rows.add(row);
}
// compute the final row type
RelDataType calciteRowType = rexBuilder.getTypeFactory().leastRestrictive(calciteRowTypes);
for (int i = 0; i < calciteFieldNames.size(); i++) {
ColumnInfo colInfo =
new ColumnInfo(
calciteFieldNames.get(i),
HiveParserTypeConverter.convert(
calciteRowType.getFieldList().get(i).getType()),
tabAlias,
false);
rowResolver.put(tabAlias, calciteFieldNames.get(i), colInfo);
}
return HiveParserUtils.genValuesRelNode(
cluster,
rexBuilder.getTypeFactory().createStructType(calciteRowType.getFieldList()),
rows);
}