in parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java [332:402]
protected <D> Iterable<D> openDataFile(final String source, Schema projection)
throws IOException {
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
case PARQUET:
Configuration conf = new Configuration(getConf());
// TODO: add these to the reader builder
AvroReadSupport.setRequestedProjection(conf, projection);
AvroReadSupport.setAvroReadSchema(conf, projection);
final ParquetReader<D> parquet = AvroParquetReader.<D>builder(qualifiedPath(source))
.disableCompatibility()
.withDataModel(GenericData.get())
.withConf(conf)
.build();
return new Iterable<D>() {
@Override
public Iterator<D> iterator() {
return new Iterator<D>() {
private boolean hasNext = false;
private D next = advance();
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public D next() {
if (!hasNext) {
throw new NoSuchElementException();
}
D toReturn = next;
this.next = advance();
return toReturn;
}
private D advance() {
try {
D next = parquet.read();
this.hasNext = (next != null);
return next;
} catch (IOException e) {
throw new RuntimeException(
"Failed while reading Parquet file: " + source, e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
};
case AVRO:
Iterable<D> avroReader = (Iterable<D>) DataFileReader.openReader(
openSeekable(source), new GenericDatumReader<>(projection));
return avroReader;
default:
if (source.endsWith("json")) {
return new AvroJsonReader<>(open(source), projection);
} else {
Preconditions.checkArgument(projection == null,
"Cannot select columns from text files");
Iterable text = CharStreams.readLines(new InputStreamReader(open(source)));
return text;
}
}
}