in exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java [205:385]
protected PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
switch(type.getPrimitiveTypeName()) {
case INT32: {
if (type.getOriginalType() == null) {
return getIntConverter(name, type);
}
switch(type.getOriginalType()) {
case UINT_8 :
case UINT_16:
case UINT_32:
case INT_8 :
case INT_16 :
case INT_32 : {
return getIntConverter(name, type);
}
case DECIMAL: {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return getVarDecimalConverter(name, type);
}
case DATE: {
DateWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).date(), l -> l.list().date())
: getWriter(name, (m, f) -> m.date(f), l -> l.date());
switch(containsCorruptedDates) {
case META_SHOWS_CORRUPTION:
return new DrillCorruptedDateConverter(writer);
case META_SHOWS_NO_CORRUPTION:
return new DrillDateConverter(writer);
case META_UNCLEAR_TEST_VALUES:
return new CorruptionDetectingDateConverter(writer);
default:
throw new DrillRuntimeException(
String.format("Issue setting up parquet reader for date type, " +
"unrecognized date corruption status %s. See DRILL-4203 for more info.",
containsCorruptedDates));
}
}
case TIME_MILLIS: {
TimeWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).time(), l -> l.list().time())
: getWriter(name, (m, f) -> m.time(f), l -> l.time());
return new DrillTimeConverter(writer);
}
default: {
throw new UnsupportedOperationException("Unsupported type: " + type.getOriginalType());
}
}
}
case INT64: {
if (type.getOriginalType() == null) {
return getBigIntConverter(name, type);
}
switch(type.getOriginalType()) {
case UINT_64:
case INT_64:
return getBigIntConverter(name, type);
case TIMESTAMP_MICROS: {
if (options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
return getBigIntConverter(name, type);
} else {
TimeStampWriter writer = getTimeStampWriter(name, type);
return new DrillTimeStampMicrosConverter(writer);
}
}
case TIME_MICROS: {
if (options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
return getBigIntConverter(name, type);
} else {
TimeWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).time(), l -> l.list().time())
: getWriter(name, MapWriter::time, ListWriter::time);
return new DrillTimeMicrosConverter(writer);
}
}
case DECIMAL: {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return getVarDecimalConverter(name, type);
}
case TIMESTAMP_MILLIS: {
TimeStampWriter writer = getTimeStampWriter(name, type);
return new DrillTimeStampConverter(writer);
}
default: {
throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType());
}
}
}
case INT96: {
// TODO: replace null with TIMESTAMP_NANOS once parquet support such type annotation.
if (type.getOriginalType() == null) {
if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
TimeStampWriter writer = getTimeStampWriter(name, type);
return new DrillFixedBinaryToTimeStampConverter(writer);
} else {
VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
: getWriter(name, (m, f) -> m.varBinary(f), listWriter -> listWriter.varBinary());
return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetColumnMetadata.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
}
}
}
case FLOAT: {
Float4Writer writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).float4(), l -> l.list().float4())
: getWriter(name, (m, f) -> m.float4(f), l -> l.float4());
return new DrillFloat4Converter(writer);
}
case DOUBLE: {
Float8Writer writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).float8(), l -> l.list().float8())
: getWriter(name, (m, f) -> m.float8(f), l -> l.float8());
return new DrillFloat8Converter(writer);
}
case BOOLEAN: {
BitWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).bit(), l -> l.list().bit())
: getWriter(name, (m, f) -> m.bit(f), l -> l.bit());
return new DrillBoolConverter(writer);
}
case BINARY: {
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter> typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter>() {
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return Optional.of(getVarDecimalConverter(name, type));
}
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(getVarCharConverter(name, type));
}
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation stringLogicalType) {
return Optional.of(getVarCharConverter(name, type));
}
};
Supplier<PrimitiveConverter> converterSupplier = () -> {
VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
: getWriter(name, MapWriter::varBinary, ListWriter::varBinary);
return new DrillVarBinaryConverter(writer, mutator.getManagedBuffer());
};
return Optional.ofNullable(type.getLogicalTypeAnnotation())
.map(typeAnnotation -> typeAnnotation.accept(typeAnnotationVisitor))
.flatMap(Function.identity())
.orElseGet(converterSupplier);
}
case FIXED_LEN_BYTE_ARRAY:
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter> typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter>() {
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return Optional.of(getVarDecimalConverter(name, type));
}
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
IntervalWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).interval(), l -> l.list().interval())
: getWriter(name, MapWriter::interval, ListWriter::interval);
return Optional.of(new DrillFixedLengthByteArrayToInterval(writer));
}
};
Supplier<PrimitiveConverter> converterSupplier = () -> {
VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
: getWriter(name, MapWriter::varBinary, ListWriter::varBinary);
return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer());
};
return Optional.ofNullable(type.getLogicalTypeAnnotation())
.map(typeAnnotation -> typeAnnotation.accept(typeAnnotationVisitor))
.flatMap(Function.identity())
.orElseGet(converterSupplier);
default:
throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
}
}