in parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java [122:192]
public int run() throws IOException {
Preconditions.checkArgument(targets != null && targets.size() == 1,
"CSV path is required.");
if (header != null) {
// if a header is given on the command line, don't assume one is in the file
noHeader = true;
}
CSVProperties props = new CSVProperties.Builder()
.delimiter(delimiter)
.escape(escape)
.quote(quote)
.header(header)
.hasHeader(!noHeader)
.linesToSkip(linesToSkip)
.charset(charsetName)
.build();
String source = targets.get(0);
Schema csvSchema;
if (avroSchemaFile != null) {
csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
} else {
Set<String> required = ImmutableSet.of();
if (requiredFields != null) {
required = ImmutableSet.copyOf(requiredFields);
}
String filename = new File(source).getName();
String recordName;
if (filename.contains(".")) {
recordName = filename.substring(0, filename.indexOf("."));
} else {
recordName = filename;
}
csvSchema = AvroCSV.inferNullableSchema(
recordName, open(source), props, required);
}
long count = 0;
try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
open(source), props, csvSchema, Record.class, true)) {
CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
try (ParquetWriter<Record> writer = AvroParquetWriter
.<Record>builder(qualifiedPath(outputPath))
.withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
.withWriteMode(overwrite ?
ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
.withCompressionCodec(codec)
.withDictionaryEncoding(true)
.withDictionaryPageSize(dictionaryPageSize)
.withPageSize(pageSize)
.withRowGroupSize(rowGroupSize)
.withDataModel(GenericData.get())
.withConf(getConf())
.withSchema(csvSchema)
.build()) {
for (Record record : reader) {
writer.write(record);
count++;
}
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count, e);
}
}
return 0;
}