in parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java [321:477]
private Schema convertField(final Type parquetType, Map<String, Integer> names) {
if (parquetType.isPrimitive()) {
final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName = asPrimitive.getPrimitiveTypeName();
final LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
Schema schema = parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
@Override
public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Schema.Type.BOOLEAN);
}
@Override
public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Schema.Type.INT);
}
@Override
public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Schema.Type.LONG);
}
@Override
public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
if (readInt96AsFixed) {
return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
}
throw new IllegalArgumentException(
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Schema.Type.FLOAT);
}
@Override
public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
return Schema.create(Schema.Type.DOUBLE);
}
@Override
public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
if (annotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
return Schema.create(Schema.Type.STRING);
} else {
int size = parquetType.asPrimitiveType().getTypeLength();
String name = parquetType.getName();
String ns = namespace(name, names);
return Schema.createFixed(name, null, ns, size);
}
}
@Override
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
if (annotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation
|| annotation instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
return Schema.create(Schema.Type.STRING);
} else {
return Schema.create(Schema.Type.BYTES);
}
}
});
LogicalType logicalType = convertLogicalType(annotation);
if (logicalType != null
&& (!(annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
|| parquetPrimitiveTypeName == BINARY
|| parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
schema = logicalType.addToSchema(schema);
}
return schema;
} else {
GroupType parquetGroupType = parquetType.asGroupType();
LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
return logicalTypeAnnotation
.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
@Override
public Optional<Schema> visit(
LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
if (parquetGroupType.getFieldCount() != 1) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
Type repeatedType = parquetGroupType.getType(0);
if (!repeatedType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
if (isElementType(repeatedType, parquetGroupType.getName())) {
// repeated element types are always required
return of(Schema.createArray(convertField(repeatedType, names)));
} else {
Type elementType =
repeatedType.asGroupType().getType(0);
if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
return of(Schema.createArray(optional(convertField(elementType, names))));
} else {
return of(Schema.createArray(convertField(elementType, names)));
}
}
}
@Override
// for backward-compatibility
public Optional<Schema> visit(
LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
return visitMapOrMapKeyValue();
}
@Override
public Optional<Schema> visit(
LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
return visitMapOrMapKeyValue();
}
private Optional<Schema> visitMapOrMapKeyValue() {
if (parquetGroupType.getFieldCount() != 1
|| parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
GroupType mapKeyValType =
parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(REPEATED) || mapKeyValType.getFieldCount() != 2) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
Type keyType = mapKeyValType.getType(0);
if (!keyType.isPrimitive()
|| !keyType.asPrimitiveType()
.getPrimitiveTypeName()
.equals(PrimitiveTypeName.BINARY)
|| !keyType.getLogicalTypeAnnotation().equals(stringType())) {
throw new IllegalArgumentException(
"Map key type must be binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
return of(Schema.createMap(optional(convertField(valueType, names))));
} else {
return of(Schema.createMap(convertField(valueType, names)));
}
}
@Override
public Optional<Schema> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return of(Schema.create(Schema.Type.STRING));
}
})
.orElseThrow(
() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
} else {
// if no original type then it's a record
return convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names);
}
}
}