public static Schema convertToSchema()

in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java [215:348]


  public static Schema convertToSchema(LogicalType logicalType, String rowName) {
    int precision;
    boolean nullable = logicalType.isNullable();
    switch (logicalType.getTypeRoot()) {
      case NULL:
        return SchemaBuilder.builder().nullType();
      case BOOLEAN:
        Schema bool = SchemaBuilder.builder().booleanType();
        return nullable ? nullableSchema(bool) : bool;
      case TINYINT:
      case SMALLINT:
      case INTEGER:
        Schema integer = SchemaBuilder.builder().intType();
        return nullable ? nullableSchema(integer) : integer;
      case BIGINT:
        Schema bigint = SchemaBuilder.builder().longType();
        return nullable ? nullableSchema(bigint) : bigint;
      case FLOAT:
        Schema f = SchemaBuilder.builder().floatType();
        return nullable ? nullableSchema(f) : f;
      case DOUBLE:
        Schema d = SchemaBuilder.builder().doubleType();
        return nullable ? nullableSchema(d) : d;
      case CHAR:
      case VARCHAR:
        Schema str = SchemaBuilder.builder().stringType();
        return nullable ? nullableSchema(str) : str;
      case BINARY:
      case VARBINARY:
        Schema binary = SchemaBuilder.builder().bytesType();
        return nullable ? nullableSchema(binary) : binary;
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        // use long to represents Timestamp
        final TimestampType timestampType = (TimestampType) logicalType;
        precision = timestampType.getPrecision();
        org.apache.avro.LogicalType timestampLogicalType;
        if (precision <= 3) {
          timestampLogicalType = LogicalTypes.timestampMillis();
        } else if (precision <= 6) {
          timestampLogicalType = LogicalTypes.timestampMicros();
        } else {
          throw new IllegalArgumentException(
              "Avro does not support TIMESTAMP type with precision: "
                  + precision
                  + ", it only support precisions <= 6.");
        }
        Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
        return nullable ? nullableSchema(timestamp) : timestamp;
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        // use long to represents LocalZonedTimestampType
        final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
        precision = localZonedTimestampType.getPrecision();
        org.apache.avro.LogicalType localZonedTimestampLogicalType;
        if (precision <= 3) {
          localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
        } else if (precision <= 6) {
          localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
        } else {
          throw new IllegalArgumentException(
              "Avro does not support LOCAL TIMESTAMP type with precision: "
                  + precision
                  + ", it only support precisions <= 6.");
        }
        Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
        return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
      case DATE:
        // use int to represents Date
        Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
        return nullable ? nullableSchema(date) : date;
      case TIME_WITHOUT_TIME_ZONE:
        precision = ((TimeType) logicalType).getPrecision();
        if (precision > 3) {
          throw new IllegalArgumentException(
              "Avro does not support TIME type with precision: "
                  + precision
                  + ", it only supports precision less than 3.");
        }
        // use int to represents Time, we only support millisecond when deserialization
        Schema time =
            LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
        return nullable ? nullableSchema(time) : time;
      case DECIMAL:
        DecimalType decimalType = (DecimalType) logicalType;
        // store BigDecimal as Fixed
        // for spark compatibility.
        Schema decimal =
            LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
                .addToSchema(SchemaBuilder
                    .fixed(String.format("%s.fixed", rowName))
                    .size(computeMinBytesForDecimalPrecision(decimalType.getPrecision())));
        return nullable ? nullableSchema(decimal) : decimal;
      case ROW:
        RowType rowType = (RowType) logicalType;
        List<String> fieldNames = rowType.getFieldNames();
        // we have to make sure the record name is different in a Schema
        SchemaBuilder.FieldAssembler<Schema> builder =
            SchemaBuilder.builder().record(rowName).fields();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          String fieldName = fieldNames.get(i);
          LogicalType fieldType = rowType.getTypeAt(i);
          SchemaBuilder.GenericDefault<Schema> fieldBuilder =
              builder.name(fieldName)
                  .type(convertToSchema(fieldType, rowName + "." + fieldName));

          if (fieldType.isNullable()) {
            builder = fieldBuilder.withDefault(null);
          } else {
            builder = fieldBuilder.noDefault();
          }
        }
        Schema record = builder.endRecord();
        return nullable ? nullableSchema(record) : record;
      case MULTISET:
      case MAP:
        Schema map =
            SchemaBuilder.builder()
                .map()
                .values(
                    convertToSchema(
                        extractValueTypeToAvroMap(logicalType), rowName));
        return nullable ? nullableSchema(map) : map;
      case ARRAY:
        ArrayType arrayType = (ArrayType) logicalType;
        Schema array =
            SchemaBuilder.builder()
                .array()
                .items(convertToSchema(arrayType.getElementType(), rowName));
        return nullable ? nullableSchema(array) : array;
      case RAW:
      default:
        throw new UnsupportedOperationException(
            "Unsupported to derive Schema for type: " + logicalType);
    }
  }