in kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java [230:292]
Type toIcebergType(Schema valueSchema) {
switch (valueSchema.type()) {
case BOOLEAN:
return BooleanType.get();
case BYTES:
if (Decimal.LOGICAL_NAME.equals(valueSchema.name())) {
int scale = Integer.parseInt(valueSchema.parameters().get(Decimal.SCALE_FIELD));
return DecimalType.of(38, scale);
}
return BinaryType.get();
case INT8:
case INT16:
return IntegerType.get();
case INT32:
if (Date.LOGICAL_NAME.equals(valueSchema.name())) {
return DateType.get();
} else if (Time.LOGICAL_NAME.equals(valueSchema.name())) {
return TimeType.get();
}
return IntegerType.get();
case INT64:
if (Timestamp.LOGICAL_NAME.equals(valueSchema.name())) {
return TimestampType.withZone();
}
return LongType.get();
case FLOAT32:
return FloatType.get();
case FLOAT64:
return DoubleType.get();
case ARRAY:
Type elementType = toIcebergType(valueSchema.valueSchema());
if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
return ListType.ofOptional(nextId(), elementType);
} else {
return ListType.ofRequired(nextId(), elementType);
}
case MAP:
Type keyType = toIcebergType(valueSchema.keySchema());
Type valueType = toIcebergType(valueSchema.valueSchema());
if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
return MapType.ofOptional(nextId(), nextId(), keyType, valueType);
} else {
return MapType.ofRequired(nextId(), nextId(), keyType, valueType);
}
case STRUCT:
List<NestedField> structFields =
valueSchema.fields().stream()
.map(
field ->
NestedField.builder()
.isOptional(
config.schemaForceOptional() || field.schema().isOptional())
.withId(nextId())
.ofType(toIcebergType(field.schema()))
.withName(field.name())
.build())
.collect(Collectors.toList());
return StructType.of(structFields);
case STRING:
default:
return StringType.get();
}
}