in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java [215:348]
public static Schema convertToSchema(LogicalType logicalType, String rowName) {
int precision;
boolean nullable = logicalType.isNullable();
switch (logicalType.getTypeRoot()) {
case NULL:
return SchemaBuilder.builder().nullType();
case BOOLEAN:
Schema bool = SchemaBuilder.builder().booleanType();
return nullable ? nullableSchema(bool) : bool;
case TINYINT:
case SMALLINT:
case INTEGER:
Schema integer = SchemaBuilder.builder().intType();
return nullable ? nullableSchema(integer) : integer;
case BIGINT:
Schema bigint = SchemaBuilder.builder().longType();
return nullable ? nullableSchema(bigint) : bigint;
case FLOAT:
Schema f = SchemaBuilder.builder().floatType();
return nullable ? nullableSchema(f) : f;
case DOUBLE:
Schema d = SchemaBuilder.builder().doubleType();
return nullable ? nullableSchema(d) : d;
case CHAR:
case VARCHAR:
Schema str = SchemaBuilder.builder().stringType();
return nullable ? nullableSchema(str) : str;
case BINARY:
case VARBINARY:
Schema binary = SchemaBuilder.builder().bytesType();
return nullable ? nullableSchema(binary) : binary;
case TIMESTAMP_WITHOUT_TIME_ZONE:
// use long to represents Timestamp
final TimestampType timestampType = (TimestampType) logicalType;
precision = timestampType.getPrecision();
org.apache.avro.LogicalType timestampLogicalType;
if (precision <= 3) {
timestampLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
timestampLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only support precisions <= 6.");
}
Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// use long to represents LocalZonedTimestampType
final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
precision = localZonedTimestampType.getPrecision();
org.apache.avro.LogicalType localZonedTimestampLogicalType;
if (precision <= 3) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
} else if (precision <= 6) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type with precision: "
+ precision
+ ", it only support precisions <= 6.");
}
Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
case DATE:
// use int to represents Date
Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
return nullable ? nullableSchema(date) : date;
case TIME_WITHOUT_TIME_ZONE:
precision = ((TimeType) logicalType).getPrecision();
if (precision > 3) {
throw new IllegalArgumentException(
"Avro does not support TIME type with precision: "
+ precision
+ ", it only supports precision less than 3.");
}
// use int to represents Time, we only support millisecond when deserialization
Schema time =
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
return nullable ? nullableSchema(time) : time;
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
// store BigDecimal as Fixed
// for spark compatibility.
Schema decimal =
LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
.addToSchema(SchemaBuilder
.fixed(String.format("%s.fixed", rowName))
.size(computeMinBytesForDecimalPrecision(decimalType.getPrecision())));
return nullable ? nullableSchema(decimal) : decimal;
case ROW:
RowType rowType = (RowType) logicalType;
List<String> fieldNames = rowType.getFieldNames();
// we have to make sure the record name is different in a Schema
SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder.builder().record(rowName).fields();
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = fieldNames.get(i);
LogicalType fieldType = rowType.getTypeAt(i);
SchemaBuilder.GenericDefault<Schema> fieldBuilder =
builder.name(fieldName)
.type(convertToSchema(fieldType, rowName + "." + fieldName));
if (fieldType.isNullable()) {
builder = fieldBuilder.withDefault(null);
} else {
builder = fieldBuilder.noDefault();
}
}
Schema record = builder.endRecord();
return nullable ? nullableSchema(record) : record;
case MULTISET:
case MAP:
Schema map =
SchemaBuilder.builder()
.map()
.values(
convertToSchema(
extractValueTypeToAvroMap(logicalType), rowName));
return nullable ? nullableSchema(map) : map;
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
Schema array =
SchemaBuilder.builder()
.array()
.items(convertToSchema(arrayType.getElementType(), rowName));
return nullable ? nullableSchema(array) : array;
case RAW:
default:
throw new UnsupportedOperationException(
"Unsupported to derive Schema for type: " + logicalType);
}
}