in xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java [98:295]
private InternalSchema toInternalSchema(
Schema schema, String parentPath, Map<String, IdMapping> fieldNameToIdMapping) {
// TODO - Does not handle recursion in Avro schema
InternalType newDataType;
Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>();
switch (schema.getType()) {
case INT:
LogicalType logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.Date) {
newDataType = InternalType.DATE;
} else {
newDataType = InternalType.INT;
}
break;
case STRING:
newDataType = InternalType.STRING;
break;
case BOOLEAN:
newDataType = InternalType.BOOLEAN;
break;
case BYTES:
case FIXED:
logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.Decimal) {
metadata.put(
InternalSchema.MetadataKey.DECIMAL_PRECISION,
((LogicalTypes.Decimal) logicalType).getPrecision());
metadata.put(
InternalSchema.MetadataKey.DECIMAL_SCALE,
((LogicalTypes.Decimal) logicalType).getScale());
if (schema.getType() == Schema.Type.FIXED) {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
}
newDataType = InternalType.DECIMAL;
break;
}
if (schema.getType() == Schema.Type.FIXED) {
String xtableLogicalType = schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE);
if ("uuid".equals(xtableLogicalType)) {
newDataType = InternalType.UUID;
} else {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
newDataType = InternalType.FIXED;
}
} else {
newDataType = InternalType.BYTES;
}
break;
case DOUBLE:
newDataType = InternalType.DOUBLE;
break;
case FLOAT:
newDataType = InternalType.FLOAT;
break;
case LONG:
logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimestampMillis) {
newDataType = InternalType.TIMESTAMP;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
newDataType = InternalType.TIMESTAMP;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// TODO: https://github.com/apache/incubator-xtable/issues/672
// Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
newDataType = InternalType.LONG;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// TODO: https://github.com/apache/incubator-xtable/issues/672
// Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
newDataType = InternalType.LONG;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
} else {
newDataType = InternalType.LONG;
}
break;
case ENUM:
metadata.put(InternalSchema.MetadataKey.ENUM_VALUES, schema.getEnumSymbols());
newDataType = InternalType.ENUM;
break;
case NULL:
newDataType = InternalType.NULL;
break;
case RECORD:
List<InternalField> subFields = new ArrayList<>(schema.getFields().size());
for (Schema.Field avroField : schema.getFields()) {
IdMapping idMapping = fieldNameToIdMapping.get(avroField.name());
InternalSchema subFieldSchema =
toInternalSchema(
avroField.schema(),
SchemaUtils.getFullyQualifiedPath(parentPath, avroField.name()),
getChildIdMap(idMapping));
Object defaultValue = getDefaultValue(avroField);
subFields.add(
InternalField.builder()
.parentPath(parentPath)
.name(avroField.name())
.schema(subFieldSchema)
.defaultValue(defaultValue)
.fieldId(idMapping == null ? null : idMapping.getId())
.build());
}
return InternalSchema.builder()
.name(schema.getName())
.comment(schema.getDoc())
.dataType(InternalType.RECORD)
.fields(subFields)
.isNullable(schema.isNullable())
.build();
case ARRAY:
IdMapping elementMapping = fieldNameToIdMapping.get(ELEMENT);
InternalSchema elementSchema =
toInternalSchema(
schema.getElementType(),
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
getChildIdMap(elementMapping));
InternalField elementField =
InternalField.builder()
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
.parentPath(parentPath)
.schema(elementSchema)
.fieldId(elementMapping == null ? null : elementMapping.getId())
.build();
return InternalSchema.builder()
.name(schema.getName())
.dataType(InternalType.LIST)
.comment(schema.getDoc())
.isNullable(schema.isNullable())
.fields(Collections.singletonList(elementField))
.build();
case MAP:
IdMapping keyMapping = fieldNameToIdMapping.get(KEY);
IdMapping valueMapping = fieldNameToIdMapping.get(VALUE);
InternalSchema valueSchema =
toInternalSchema(
schema.getValueType(),
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
getChildIdMap(valueMapping));
InternalField valueField =
InternalField.builder()
.name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
.parentPath(parentPath)
.schema(valueSchema)
.fieldId(valueMapping == null ? null : valueMapping.getId())
.build();
return InternalSchema.builder()
.name(schema.getName())
.dataType(InternalType.MAP)
.comment(schema.getDoc())
.isNullable(schema.isNullable())
.fields(
Arrays.asList(
MAP_KEY_FIELD.toBuilder()
.parentPath(parentPath)
.fieldId(keyMapping == null ? null : keyMapping.getId())
.build(),
valueField))
.build();
case UNION:
boolean containsUnionWithNull =
schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL);
if (containsUnionWithNull) {
List<Schema> remainingSchemas =
schema.getTypes().stream()
.filter(t -> t.getType() != Schema.Type.NULL)
.collect(Collectors.toList());
if (remainingSchemas.size() == 1) {
InternalSchema restSchema =
toInternalSchema(remainingSchemas.get(0), parentPath, fieldNameToIdMapping);
return InternalSchema.builderFrom(restSchema).isNullable(true).build();
} else {
return InternalSchema.builderFrom(
toInternalSchema(Schema.createUnion(remainingSchemas)))
.isNullable(true)
.build();
}
} else {
throw new UnsupportedSchemaTypeException(
String.format("Unsupported complex union type %s", schema));
}
default:
throw new UnsupportedSchemaTypeException(
String.format("Unsupported schema type %s", schema));
}
return InternalSchema.builder()
.name(schema.getName())
.dataType(newDataType)
.comment(schema.getDoc())
.isNullable(schema.isNullable())
.metadata(metadata.isEmpty() ? null : metadata)
.build();
}