private Map getSparkTableProperties()

in xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java [68:139]


  private Map<String, String> getSparkTableProperties(
      List<String> partitionNames,
      String sparkVersion,
      int schemaLengthThreshold,
      InternalSchema schema) {
    List<InternalField> partitionCols = new ArrayList<>();
    List<InternalField> dataCols = new ArrayList<>();
    Map<String, InternalField> column2Field = new HashMap<>();

    for (InternalField field : schema.getFields()) {
      column2Field.put(field.getName(), field);
    }
    // Get partition columns and data columns.
    for (String partitionName : partitionNames) {
      // Default the unknown partition fields to be String.
      // Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
      partitionCols.add(
          column2Field.getOrDefault(
              partitionName,
              InternalField.builder()
                  .name(partitionName)
                  .schema(
                      InternalSchema.builder()
                          .dataType(InternalType.BYTES)
                          .isNullable(false)
                          .build())
                  .build()));
    }

    for (InternalField field : schema.getFields()) {
      if (!partitionNames.contains(field.getName())) {
        dataCols.add(field);
      }
    }

    List<InternalField> reOrderedFields = new ArrayList<>();
    reOrderedFields.addAll(dataCols);
    reOrderedFields.addAll(partitionCols);
    InternalSchema reorderedSchema =
        InternalSchema.builder()
            .fields(reOrderedFields)
            .dataType(InternalType.RECORD)
            .name(schema.getName())
            .build();

    StructType sparkSchema = SparkSchemaExtractor.getInstance().fromInternalSchema(reorderedSchema);

    Map<String, String> sparkProperties = new HashMap<>();
    sparkProperties.put("spark.sql.sources.provider", "hudi");
    if (!StringUtils.isNullOrEmpty(sparkVersion)) {
      sparkProperties.put("spark.sql.create.version", sparkVersion);
    }
    // Split the schema string to multi-parts according the schemaLengthThreshold size.
    String schemaString = sparkSchema.json();
    int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
    sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
    // Add each part of schema string to sparkProperties
    for (int i = 0; i < numSchemaPart; i++) {
      int start = i * schemaLengthThreshold;
      int end = Math.min(start + schemaLengthThreshold, schemaString.length());
      sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
    }
    // Add partition columns
    if (!partitionNames.isEmpty()) {
      sparkProperties.put(
          "spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
      for (int i = 0; i < partitionNames.size(); i++) {
        sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
      }
    }
    return sparkProperties;
  }