in parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java [288:346]
private FieldSchema getComplexFieldSchema(String fieldName, Type parquetType)
throws FrontendException {
GroupType parquetGroupType = parquetType.asGroupType();
LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
try {
return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<FieldSchema>() {
@Override
public Optional<FieldSchema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
try {
// verify that its a map
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new SchemaConversionException("Invalid map type " + parquetGroupType);
}
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
(mapKeyValType.getLogicalTypeAnnotation() != null && !mapKeyValType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance())) ||
mapKeyValType.getFieldCount() != 2) {
throw new SchemaConversionException("Invalid map type " + parquetGroupType);
}
// if value is not primitive wrap it in a tuple
Type valueType = mapKeyValType.getType(1);
Schema s = convertField(valueType);
s.getField(0).alias = null;
return of(new FieldSchema(fieldName, s, DataType.MAP));
} catch (FrontendException e) {
throw new FrontendExceptionWrapper(e);
}
}
@Override
public Optional<FieldSchema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
try {
Type type = parquetGroupType.getType(0);
if (parquetGroupType.getFieldCount() != 1 || type.isPrimitive()) {
// an array is effectively a bag
Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
}
GroupType tupleType = parquetGroupType.getType(0).asGroupType();
if (!tupleType.isRepetition(Repetition.REPEATED)) {
throw new SchemaConversionException("Invalid list type " + parquetGroupType);
}
Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
} catch (FrontendException e) {
throw new FrontendExceptionWrapper(e);
}
}
}).orElseThrow(() -> new SchemaConversionException("Unexpected original type for " + parquetType + ": " + logicalTypeAnnotation));
} catch (FrontendExceptionWrapper e) {
throw e.frontendException;
}
} else {
// if original type is not set, we assume it to be tuple
return new FieldSchema(fieldName, convertFields(parquetGroupType.getFields()), DataType.TUPLE);
}
}