public int run()

in parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java [122:192]


  public int run() throws IOException {
    Preconditions.checkArgument(targets != null && targets.size() == 1,
        "CSV path is required.");

    if (header != null) {
      // if a header is given on the command line, don't assume one is in the file
      noHeader = true;
    }

    CSVProperties props = new CSVProperties.Builder()
        .delimiter(delimiter)
        .escape(escape)
        .quote(quote)
        .header(header)
        .hasHeader(!noHeader)
        .linesToSkip(linesToSkip)
        .charset(charsetName)
        .build();

    String source = targets.get(0);

    Schema csvSchema;
    if (avroSchemaFile != null) {
      csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
    } else {
      Set<String> required = ImmutableSet.of();
      if (requiredFields != null) {
        required = ImmutableSet.copyOf(requiredFields);
      }

      String filename = new File(source).getName();
      String recordName;
      if (filename.contains(".")) {
        recordName = filename.substring(0, filename.indexOf("."));
      } else {
        recordName = filename;
      }

      csvSchema = AvroCSV.inferNullableSchema(
          recordName, open(source), props, required);
    }

    long count = 0;
    try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
        open(source), props, csvSchema, Record.class, true)) {
        CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
      try (ParquetWriter<Record> writer = AvroParquetWriter
          .<Record>builder(qualifiedPath(outputPath))
          .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
          .withWriteMode(overwrite ?
              ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
          .withCompressionCodec(codec)
          .withDictionaryEncoding(true)
          .withDictionaryPageSize(dictionaryPageSize)
          .withPageSize(pageSize)
          .withRowGroupSize(rowGroupSize)
          .withDataModel(GenericData.get())
          .withConf(getConf())
          .withSchema(csvSchema)
          .build()) {
        for (Record record : reader) {
          writer.write(record);
          count++;
        }
      } catch (RuntimeException e) {
        throw new RuntimeException("Failed on record " + count, e);
      }
    }

    return 0;
  }