Type toIcebergType()

in kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java [230:292]


    Type toIcebergType(Schema valueSchema) {
      switch (valueSchema.type()) {
        case BOOLEAN:
          return BooleanType.get();
        case BYTES:
          if (Decimal.LOGICAL_NAME.equals(valueSchema.name())) {
            int scale = Integer.parseInt(valueSchema.parameters().get(Decimal.SCALE_FIELD));
            return DecimalType.of(38, scale);
          }
          return BinaryType.get();
        case INT8:
        case INT16:
          return IntegerType.get();
        case INT32:
          if (Date.LOGICAL_NAME.equals(valueSchema.name())) {
            return DateType.get();
          } else if (Time.LOGICAL_NAME.equals(valueSchema.name())) {
            return TimeType.get();
          }
          return IntegerType.get();
        case INT64:
          if (Timestamp.LOGICAL_NAME.equals(valueSchema.name())) {
            return TimestampType.withZone();
          }
          return LongType.get();
        case FLOAT32:
          return FloatType.get();
        case FLOAT64:
          return DoubleType.get();
        case ARRAY:
          Type elementType = toIcebergType(valueSchema.valueSchema());
          if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
            return ListType.ofOptional(nextId(), elementType);
          } else {
            return ListType.ofRequired(nextId(), elementType);
          }
        case MAP:
          Type keyType = toIcebergType(valueSchema.keySchema());
          Type valueType = toIcebergType(valueSchema.valueSchema());
          if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
            return MapType.ofOptional(nextId(), nextId(), keyType, valueType);
          } else {
            return MapType.ofRequired(nextId(), nextId(), keyType, valueType);
          }
        case STRUCT:
          List<NestedField> structFields =
              valueSchema.fields().stream()
                  .map(
                      field ->
                          NestedField.builder()
                              .isOptional(
                                  config.schemaForceOptional() || field.schema().isOptional())
                              .withId(nextId())
                              .ofType(toIcebergType(field.schema()))
                              .withName(field.name())
                              .build())
                  .collect(Collectors.toList());
          return StructType.of(structFields);
        case STRING:
        default:
          return StringType.get();
      }
    }