in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java [1975:2079]
public static void validatePartColumnType(
ResolvedCatalogTable resolvedCatalogTable,
Map<String, String> partSpec,
HiveParserASTNode astNode,
HiveConf conf,
FrameworkConfig frameworkConfig,
RelOptCluster cluster)
throws SemanticException {
if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TYPE_CHECK_ON_INSERT)) {
return;
}
Map<HiveParserASTNode, ExprNodeDesc> astExprNodeMap = new HashMap<>();
if (!getPartExprNodeDesc(astNode, conf, astExprNodeMap, frameworkConfig, cluster)) {
LOG.warn(
"Dynamic partitioning is used; only validating "
+ astExprNodeMap.size()
+ " columns");
}
if (astExprNodeMap.isEmpty()) {
return; // All columns are dynamic, nothing to do.
}
List<String> parts = resolvedCatalogTable.getPartitionKeys();
Map<String, TypeInfo> partColsTypes =
CollectionUtil.newHashMapWithExpectedSize(parts.size());
for (String col : parts) {
Optional<DataType> dataType =
resolvedCatalogTable
.getResolvedSchema()
.getColumn(col)
.map(Column::getDataType);
TypeInfo hiveType =
HiveTypeUtil.toHiveTypeInfo(
dataType.orElseThrow(
() ->
new SemanticException(
String.format(
"Can't get data type for column %s.",
col))),
false);
partColsTypes.put(col, hiveType);
}
for (Map.Entry<HiveParserASTNode, ExprNodeDesc> astExprNodePair :
astExprNodeMap.entrySet()) {
String astKeyName = astExprNodePair.getKey().toString().toLowerCase();
if (astExprNodePair.getKey().getType() == HiveASTParser.Identifier) {
astKeyName = stripIdentifierQuotes(astKeyName);
}
TypeInfo expectedType = partColsTypes.get(astKeyName);
ObjectInspector inputOI =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
astExprNodePair.getValue().getTypeInfo());
ObjectInspector outputOI =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType);
// Since partVal is a constant, it is safe to cast ExprNodeDesc to
// ExprNodeConstantDesc.
// Its value should be in normalized format (e.g. no leading zero in integer, date is
// in
// format of YYYY-MM-DD etc)
Object value = ((ExprNodeConstantDesc) astExprNodePair.getValue()).getValue();
Object convertedValue = value;
if (!inputOI.getTypeName().equals(outputOI.getTypeName())) {
convertedValue =
ObjectInspectorConverters.getConverter(inputOI, outputOI).convert(value);
if (convertedValue == null) {
throw new SemanticException(
ErrorMsg.PARTITION_SPEC_TYPE_MISMATCH,
astKeyName,
inputOI.getTypeName(),
outputOI.getTypeName());
}
if (!convertedValue.toString().equals(value.toString())) {
// value might have been changed because of the normalization in conversion
LOG.warn(
"Partition "
+ astKeyName
+ " expects type "
+ outputOI.getTypeName()
+ " but input value is in type "
+ inputOI.getTypeName()
+ ". Convert "
+ value
+ " to "
+ convertedValue);
}
}
if (!convertedValue.toString().equals(partSpec.get(astKeyName))) {
LOG.warn(
"Partition Spec "
+ astKeyName
+ "="
+ partSpec.get(astKeyName)
+ " has been changed to "
+ astKeyName
+ "="
+ convertedValue);
}
partSpec.put(astKeyName, convertedValue.toString());
}
}