in xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java [68:139]
private Map<String, String> getSparkTableProperties(
List<String> partitionNames,
String sparkVersion,
int schemaLengthThreshold,
InternalSchema schema) {
List<InternalField> partitionCols = new ArrayList<>();
List<InternalField> dataCols = new ArrayList<>();
Map<String, InternalField> column2Field = new HashMap<>();
for (InternalField field : schema.getFields()) {
column2Field.put(field.getName(), field);
}
// Get partition columns and data columns.
for (String partitionName : partitionNames) {
// Default the unknown partition fields to be String.
// Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
partitionCols.add(
column2Field.getOrDefault(
partitionName,
InternalField.builder()
.name(partitionName)
.schema(
InternalSchema.builder()
.dataType(InternalType.BYTES)
.isNullable(false)
.build())
.build()));
}
for (InternalField field : schema.getFields()) {
if (!partitionNames.contains(field.getName())) {
dataCols.add(field);
}
}
List<InternalField> reOrderedFields = new ArrayList<>();
reOrderedFields.addAll(dataCols);
reOrderedFields.addAll(partitionCols);
InternalSchema reorderedSchema =
InternalSchema.builder()
.fields(reOrderedFields)
.dataType(InternalType.RECORD)
.name(schema.getName())
.build();
StructType sparkSchema = SparkSchemaExtractor.getInstance().fromInternalSchema(reorderedSchema);
Map<String, String> sparkProperties = new HashMap<>();
sparkProperties.put("spark.sql.sources.provider", "hudi");
if (!StringUtils.isNullOrEmpty(sparkVersion)) {
sparkProperties.put("spark.sql.create.version", sparkVersion);
}
// Split the schema string to multi-parts according the schemaLengthThreshold size.
String schemaString = sparkSchema.json();
int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
// Add each part of schema string to sparkProperties
for (int i = 0; i < numSchemaPart; i++) {
int start = i * schemaLengthThreshold;
int end = Math.min(start + schemaLengthThreshold, schemaString.length());
sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
}
// Add partition columns
if (!partitionNames.isEmpty()) {
sparkProperties.put(
"spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
for (int i = 0; i < partitionNames.size(); i++) {
sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
}
}
return sparkProperties;
}