in parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java [122:198]
public int run() throws IOException {
Preconditions.checkArgument(targets != null && !targets.isEmpty(), "CSV path is required.");
if (header != null) {
// if a header is given on the command line, don't assume one is in the file
noHeader = true;
}
CSVProperties props = new CSVProperties.Builder()
.delimiter(delimiter)
.escape(escape)
.quote(quote)
.header(header)
.hasHeader(!noHeader)
.linesToSkip(linesToSkip)
.charset(charsetName)
.build();
Schema csvSchema = null;
if (avroSchemaFile != null) {
csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
} else {
Set<String> required = ImmutableSet.of();
if (requiredFields != null) {
required = ImmutableSet.copyOf(requiredFields);
}
String filename = new File(targets.get(0)).getName();
String recordName;
if (filename.contains(".")) {
recordName = filename.substring(0, filename.indexOf("."));
} else {
recordName = filename;
}
// If the schema is not explicitly provided,
// ensure that all input files share the same one.
for (String target : targets) {
Schema schema = AvroCSV.inferNullableSchema(recordName, open(target), props, required);
if (csvSchema == null) {
csvSchema = schema;
} else if (!SchemaNormalization.toParsingForm(csvSchema)
.equals(SchemaNormalization.toParsingForm(schema))) {
throw new IllegalArgumentException(target + " seems to have a different schema from others. "
+ "Please specify the correct schema explicitly with the `--schema` option.");
}
}
}
try (ParquetWriter<Record> writer = AvroParquetWriter.<Record>builder(qualifiedPath(outputPath))
.withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
.withWriteMode(overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
.withCompressionCodec(Codecs.parquetCodec(compressionCodecName))
.withDictionaryEncoding(true)
.withDictionaryPageSize(dictionaryPageSize)
.withPageSize(pageSize)
.withRowGroupSize(rowGroupSize)
.withDataModel(GenericData.get())
.withConf(getConf())
.withSchema(csvSchema)
.build()) {
for (String target : targets) {
long count = 0;
try (AvroCSVReader<Record> reader =
new AvroCSVReader<>(open(target), props, csvSchema, Record.class, true)) {
for (Record record : reader) {
writer.write(record);
count++;
}
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in file " + target, e);
}
}
}
return 0;
}