int main()

in tools/src/CSVFileImport.cc [266:464]


int main(int argc, char* argv[]) {
  std::string input;
  std::string output;
  std::string schema;
  std::string timezoneName = "GMT";
  uint64_t stripeSize = (128 << 20);  // 128M
  uint64_t blockSize = 64 << 10;      // 64K
  uint64_t batchSize = 1024;
  orc::CompressionKind compression = orc::CompressionKind_ZLIB;

  static struct option longOptions[] = {{"help", no_argument, nullptr, 'h'},
                                        {"metrics", no_argument, nullptr, 'm'},
                                        {"delimiter", required_argument, nullptr, 'd'},
                                        {"stripe", required_argument, nullptr, 's'},
                                        {"block", required_argument, nullptr, 'c'},
                                        {"batch", required_argument, nullptr, 'b'},
                                        {"timezone", required_argument, nullptr, 't'},
                                        {nullptr, 0, nullptr, 0}};
  bool helpFlag = false;
  bool showMetrics = false;
  int opt;
  char* tail;
  do {
    opt = getopt_long(argc, argv, "d:s:c:b:t:mh", longOptions, nullptr);
    switch (opt) {
      case 'h':
        helpFlag = true;
        opt = -1;
        break;
      case 'm':
        showMetrics = true;
        break;
      case 'd':
        gDelimiter = optarg[0];
        break;
      case 's':
        stripeSize = strtoul(optarg, &tail, 10);
        if (*tail != '\0') {
          fprintf(stderr, "The --stripe parameter requires an integer option.\n");
          return 1;
        }
        break;
      case 'c':
        blockSize = strtoul(optarg, &tail, 10);
        if (*tail != '\0') {
          fprintf(stderr, "The --block parameter requires an integer option.\n");
          return 1;
        }
        break;
      case 'b':
        batchSize = strtoul(optarg, &tail, 10);
        if (*tail != '\0') {
          fprintf(stderr, "The --batch parameter requires an integer option.\n");
          return 1;
        }
        break;
      case 't':
        timezoneName = std::string(optarg);
        break;
    }
  } while (opt != -1);

  argc -= optind;
  argv += optind;

  if (argc != 3 || helpFlag) {
    usage();
    return 1;
  }

  schema = argv[0];
  input = argv[1];
  output = argv[2];

  std::cout << GetDate() << " Start importing Orc file..." << std::endl;
  std::unique_ptr<orc::Type> fileType = orc::Type::buildTypeFromString(schema);

  double totalElapsedTime = 0.0;
  clock_t totalCPUTime = 0;

  typedef std::list<orc::DataBuffer<char>> DataBufferList;
  DataBufferList bufferList;

  orc::WriterOptions options;
  orc::WriterMetrics metrics;
  options.setStripeSize(stripeSize);
  options.setCompressionBlockSize(blockSize);
  options.setCompression(compression);
  options.setTimezoneName(timezoneName);
  options.setWriterMetrics(showMetrics ? &metrics : nullptr);

  std::unique_ptr<orc::OutputStream> outStream = orc::writeLocalFile(output);
  std::unique_ptr<orc::Writer> writer = orc::createWriter(*fileType, outStream.get(), options);
  std::unique_ptr<orc::ColumnVectorBatch> rowBatch = writer->createRowBatch(batchSize);

  bool eof = false;
  std::string line;
  std::vector<std::string> data;  // buffer that holds a batch of rows in raw text
  std::ifstream finput(input.c_str());
  while (!eof) {
    uint64_t numValues = 0;  // num of lines read in a batch

    data.clear();
    memset(rowBatch->notNull.data(), 1, batchSize);

    // read a batch of lines from the input file
    for (uint64_t i = 0; i < batchSize; ++i) {
      if (!std::getline(finput, line)) {
        eof = true;
        break;
      }
      data.push_back(line);
      ++numValues;
    }

    if (numValues != 0) {
      orc::StructVectorBatch* structBatch = dynamic_cast<orc::StructVectorBatch*>(rowBatch.get());
      structBatch->numElements = numValues;

      for (uint64_t i = 0; i < structBatch->fields.size(); ++i) {
        const orc::Type* subType = fileType->getSubtype(i);
        switch (subType->getKind()) {
          case orc::BYTE:
          case orc::INT:
          case orc::SHORT:
          case orc::LONG:
            fillLongValues(data, structBatch->fields[i], numValues, i);
            break;
          case orc::STRING:
          case orc::CHAR:
          case orc::VARCHAR:
          case orc::BINARY:
            bufferList.emplace_back(*orc::getDefaultPool(), 1 * 1024 * 1024);
            fillStringValues(data, structBatch->fields[i], numValues, i, bufferList.back());
            break;
          case orc::FLOAT:
          case orc::DOUBLE:
            fillDoubleValues(data, structBatch->fields[i], numValues, i);
            break;
          case orc::DECIMAL:
            fillDecimalValues(data, structBatch->fields[i], numValues, i, subType->getScale(),
                              subType->getPrecision());
            break;
          case orc::BOOLEAN:
            fillBoolValues(data, structBatch->fields[i], numValues, i);
            break;
          case orc::DATE:
            fillDateValues(data, structBatch->fields[i], numValues, i);
            break;
          case orc::TIMESTAMP:
          case orc::TIMESTAMP_INSTANT:
            fillTimestampValues(data, structBatch->fields[i], numValues, i);
            break;
          case orc::STRUCT:
          case orc::LIST:
          case orc::MAP:
          case orc::UNION:
            throw std::runtime_error(subType->toString() + " is not supported yet.");
        }
      }

      struct timeval t_start, t_end;
      gettimeofday(&t_start, nullptr);
      clock_t c_start = clock();

      writer->add(*rowBatch);

      totalCPUTime += (clock() - c_start);
      gettimeofday(&t_end, nullptr);
      totalElapsedTime += (static_cast<double>(t_end.tv_sec - t_start.tv_sec) * 1000000.0 +
                           static_cast<double>(t_end.tv_usec - t_start.tv_usec)) /
                          1000000.0;
    }
  }

  struct timeval t_start, t_end;
  gettimeofday(&t_start, nullptr);
  clock_t c_start = clock();

  writer->close();

  totalCPUTime += (clock() - c_start);
  gettimeofday(&t_end, nullptr);
  totalElapsedTime += (static_cast<double>(t_end.tv_sec - t_start.tv_sec) * 1000000.0 +
                       static_cast<double>(t_end.tv_usec - t_start.tv_usec)) /
                      1000000.0;

  std::cout << GetDate() << " Finish importing Orc file." << std::endl;
  std::cout << GetDate() << " Total writer elasped time: " << totalElapsedTime << "s." << std::endl;
  std::cout << GetDate()
            << " Total writer CPU time: " << static_cast<double>(totalCPUTime) / CLOCKS_PER_SEC
            << "s." << std::endl;
  if (showMetrics) {
    std::cout << GetDate() << " IO block lantency: "
              << static_cast<double>(metrics.IOBlockingLatencyUs) / 1000000.0 << "s." << std::endl;
    std::cout << GetDate() << " IO count: " << metrics.IOCount << std::endl;
  }
  return 0;
}