public Schema toIceberg()

in xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java [65:100]


  public Schema toIceberg(InternalSchema internalSchema) {
    // if field IDs are not assigned in the source, just use an incrementing integer
    AtomicInteger fieldIdTracker = new AtomicInteger(0);
    List<Types.NestedField> nestedFields = convertFields(internalSchema, fieldIdTracker);
    List<InternalField> recordKeyFields = internalSchema.getRecordKeyFields();
    boolean recordKeyFieldsAreNotRequired =
        recordKeyFields.stream().anyMatch(f -> f.getSchema().isNullable());
    // Iceberg requires the identifier fields to be required fields, so if any of the record key
    // fields are nullable, we cannot add the identifier fields to the schema properties.
    if (!recordKeyFields.isEmpty() && recordKeyFieldsAreNotRequired) {
      log.warn(
          "Record key fields are not required. Not setting record key fields in iceberg schema.");
    }
    if (recordKeyFields.isEmpty() || recordKeyFieldsAreNotRequired) {
      return new Schema(nestedFields);
    }
    // Find field in iceberg schema that matches each of the record key path and collect ids.
    Schema partialSchema = new Schema(nestedFields);
    Set<Integer> recordKeyIds =
        recordKeyFields.stream()
            .map(keyField -> partialSchema.findField(convertFromXTablePath(keyField.getPath())))
            .filter(Objects::nonNull)
            .map(Types.NestedField::fieldId)
            .collect(Collectors.toSet());
    if (recordKeyFields.size() != recordKeyIds.size()) {
      List<String> missingFieldPaths =
          recordKeyFields.stream()
              .map(InternalField::getPath)
              .filter(path -> partialSchema.findField(convertFromXTablePath(path)) == null)
              .collect(CustomCollectors.toList(recordKeyFields.size()));
      log.error("Missing field IDs for record key field paths: " + missingFieldPaths);
      throw new SchemaExtractorException(
          String.format("Mismatches in converting record key fields: %s", missingFieldPaths));
    }
    return new Schema(nestedFields, recordKeyIds);
  }