public static DataType convertToDataType()

in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java [63:176]


  public static DataType convertToDataType(Schema schema) {
    switch (schema.getType()) {
      case RECORD:
        final List<Schema.Field> schemaFields = schema.getFields();

        final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
        for (int i = 0; i < schemaFields.size(); i++) {
          final Schema.Field field = schemaFields.get(i);
          fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema()));
        }
        return DataTypes.ROW(fields).notNull();
      case ENUM:
      case STRING:
        // convert Avro's Utf8/CharSequence to String
        return DataTypes.STRING().notNull();
      case ARRAY:
        return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
      case MAP:
        return DataTypes.MAP(
                DataTypes.STRING().notNull(),
                convertToDataType(schema.getValueType()))
            .notNull();
      case UNION:
        final Schema actualSchema;
        final boolean nullable;
        if (schema.getTypes().size() == 2
            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
          actualSchema = schema.getTypes().get(1);
          nullable = true;
        } else if (schema.getTypes().size() == 2
            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
          actualSchema = schema.getTypes().get(0);
          nullable = true;
        } else if (schema.getTypes().size() == 1) {
          actualSchema = schema.getTypes().get(0);
          nullable = false;
        } else {
          List<Schema> nonNullTypes = schema.getTypes().stream()
              .filter(s -> s.getType() != Schema.Type.NULL)
              .collect(Collectors.toList());
          nullable = schema.getTypes().size() > nonNullTypes.size();

          // use Kryo for serialization
          DataType rawDataType = new AtomicDataType(
              new TypeInformationRawType<>(false, Types.GENERIC(Object.class)))
              .notNull();

          if (recordTypesOfSameNumFields(nonNullTypes)) {
            DataType converted = DataTypes.ROW(
                    DataTypes.FIELD("wrapper", rawDataType))
                .notNull();
            return nullable ? converted.nullable() : converted;
          }
          // use Kryo for serialization
          return nullable ? rawDataType.nullable() : rawDataType;
        }
        DataType converted = convertToDataType(actualSchema);
        return nullable ? converted.nullable() : converted;
      case FIXED:
        // logical decimal type
        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
          final LogicalTypes.Decimal decimalType =
              (LogicalTypes.Decimal) schema.getLogicalType();
          return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
              .notNull();
        }
        // convert fixed size binary data to primitive byte arrays
        return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
      case BYTES:
        // logical decimal type
        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
          final LogicalTypes.Decimal decimalType =
              (LogicalTypes.Decimal) schema.getLogicalType();
          return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
              .notNull();
        }
        return DataTypes.BYTES().notNull();
      case INT:
        // logical date and time type
        final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
        if (logicalType == LogicalTypes.date()) {
          return DataTypes.DATE().notNull();
        } else if (logicalType == LogicalTypes.timeMillis()) {
          return DataTypes.TIME(3).notNull();
        }
        return DataTypes.INT().notNull();
      case LONG:
        // logical timestamp type
        if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
          return DataTypes.TIMESTAMP(3).notNull();
        } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
        } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
          return DataTypes.TIMESTAMP(6).notNull();
        } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
        } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
          return DataTypes.TIME(3).notNull();
        } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
          return DataTypes.TIME(6).notNull();
        }
        return DataTypes.BIGINT().notNull();
      case FLOAT:
        return DataTypes.FLOAT().notNull();
      case DOUBLE:
        return DataTypes.DOUBLE().notNull();
      case BOOLEAN:
        return DataTypes.BOOLEAN().notNull();
      case NULL:
        return DataTypes.NULL();
      default:
        throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
    }
  }