public int run()

in parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java [122:198]


  public int run() throws IOException {
    Preconditions.checkArgument(targets != null && !targets.isEmpty(), "CSV path is required.");

    if (header != null) {
      // if a header is given on the command line, don't assume one is in the file
      noHeader = true;
    }

    CSVProperties props = new CSVProperties.Builder()
        .delimiter(delimiter)
        .escape(escape)
        .quote(quote)
        .header(header)
        .hasHeader(!noHeader)
        .linesToSkip(linesToSkip)
        .charset(charsetName)
        .build();

    Schema csvSchema = null;
    if (avroSchemaFile != null) {
      csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
    } else {
      Set<String> required = ImmutableSet.of();
      if (requiredFields != null) {
        required = ImmutableSet.copyOf(requiredFields);
      }

      String filename = new File(targets.get(0)).getName();
      String recordName;
      if (filename.contains(".")) {
        recordName = filename.substring(0, filename.indexOf("."));
      } else {
        recordName = filename;
      }

      // If the schema is not explicitly provided,
      // ensure that all input files share the same one.
      for (String target : targets) {
        Schema schema = AvroCSV.inferNullableSchema(recordName, open(target), props, required);
        if (csvSchema == null) {
          csvSchema = schema;
        } else if (!SchemaNormalization.toParsingForm(csvSchema)
            .equals(SchemaNormalization.toParsingForm(schema))) {
          throw new IllegalArgumentException(target + " seems to have a different schema from others. "
              + "Please specify the correct schema explicitly with the `--schema` option.");
        }
      }
    }

    try (ParquetWriter<Record> writer = AvroParquetWriter.<Record>builder(qualifiedPath(outputPath))
        .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
        .withWriteMode(overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
        .withCompressionCodec(Codecs.parquetCodec(compressionCodecName))
        .withDictionaryEncoding(true)
        .withDictionaryPageSize(dictionaryPageSize)
        .withPageSize(pageSize)
        .withRowGroupSize(rowGroupSize)
        .withDataModel(GenericData.get())
        .withConf(getConf())
        .withSchema(csvSchema)
        .build()) {
      for (String target : targets) {
        long count = 0;
        try (AvroCSVReader<Record> reader =
            new AvroCSVReader<>(open(target), props, csvSchema, Record.class, true)) {
          for (Record record : reader) {
            writer.write(record);
            count++;
          }
        } catch (RuntimeException e) {
          throw new RuntimeException("Failed on record " + count + " in file " + target, e);
        }
      }
    }

    return 0;
  }