in xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java [65:100]
public Schema toIceberg(InternalSchema internalSchema) {
// if field IDs are not assigned in the source, just use an incrementing integer
AtomicInteger fieldIdTracker = new AtomicInteger(0);
List<Types.NestedField> nestedFields = convertFields(internalSchema, fieldIdTracker);
List<InternalField> recordKeyFields = internalSchema.getRecordKeyFields();
boolean recordKeyFieldsAreNotRequired =
recordKeyFields.stream().anyMatch(f -> f.getSchema().isNullable());
// Iceberg requires the identifier fields to be required fields, so if any of the record key
// fields are nullable, we cannot add the identifier fields to the schema properties.
if (!recordKeyFields.isEmpty() && recordKeyFieldsAreNotRequired) {
log.warn(
"Record key fields are not required. Not setting record key fields in iceberg schema.");
}
if (recordKeyFields.isEmpty() || recordKeyFieldsAreNotRequired) {
return new Schema(nestedFields);
}
// Find field in iceberg schema that matches each of the record key path and collect ids.
Schema partialSchema = new Schema(nestedFields);
Set<Integer> recordKeyIds =
recordKeyFields.stream()
.map(keyField -> partialSchema.findField(convertFromXTablePath(keyField.getPath())))
.filter(Objects::nonNull)
.map(Types.NestedField::fieldId)
.collect(Collectors.toSet());
if (recordKeyFields.size() != recordKeyIds.size()) {
List<String> missingFieldPaths =
recordKeyFields.stream()
.map(InternalField::getPath)
.filter(path -> partialSchema.findField(convertFromXTablePath(path)) == null)
.collect(CustomCollectors.toList(recordKeyFields.size()));
log.error("Missing field IDs for record key field paths: " + missingFieldPaths);
throw new SchemaExtractorException(
String.format("Mismatches in converting record key fields: %s", missingFieldPaths));
}
return new Schema(nestedFields, recordKeyIds);
}