in tools/src/CSVFileImport.cc [266:464]
int main(int argc, char* argv[]) {
std::string input;
std::string output;
std::string schema;
std::string timezoneName = "GMT";
uint64_t stripeSize = (128 << 20); // 128M
uint64_t blockSize = 64 << 10; // 64K
uint64_t batchSize = 1024;
orc::CompressionKind compression = orc::CompressionKind_ZLIB;
static struct option longOptions[] = {{"help", no_argument, nullptr, 'h'},
{"metrics", no_argument, nullptr, 'm'},
{"delimiter", required_argument, nullptr, 'd'},
{"stripe", required_argument, nullptr, 's'},
{"block", required_argument, nullptr, 'c'},
{"batch", required_argument, nullptr, 'b'},
{"timezone", required_argument, nullptr, 't'},
{nullptr, 0, nullptr, 0}};
bool helpFlag = false;
bool showMetrics = false;
int opt;
char* tail;
do {
opt = getopt_long(argc, argv, "d:s:c:b:t:mh", longOptions, nullptr);
switch (opt) {
case 'h':
helpFlag = true;
opt = -1;
break;
case 'm':
showMetrics = true;
break;
case 'd':
gDelimiter = optarg[0];
break;
case 's':
stripeSize = strtoul(optarg, &tail, 10);
if (*tail != '\0') {
fprintf(stderr, "The --stripe parameter requires an integer option.\n");
return 1;
}
break;
case 'c':
blockSize = strtoul(optarg, &tail, 10);
if (*tail != '\0') {
fprintf(stderr, "The --block parameter requires an integer option.\n");
return 1;
}
break;
case 'b':
batchSize = strtoul(optarg, &tail, 10);
if (*tail != '\0') {
fprintf(stderr, "The --batch parameter requires an integer option.\n");
return 1;
}
break;
case 't':
timezoneName = std::string(optarg);
break;
}
} while (opt != -1);
argc -= optind;
argv += optind;
if (argc != 3 || helpFlag) {
usage();
return 1;
}
schema = argv[0];
input = argv[1];
output = argv[2];
std::cout << GetDate() << " Start importing Orc file..." << std::endl;
std::unique_ptr<orc::Type> fileType = orc::Type::buildTypeFromString(schema);
double totalElapsedTime = 0.0;
clock_t totalCPUTime = 0;
typedef std::list<orc::DataBuffer<char>> DataBufferList;
DataBufferList bufferList;
orc::WriterOptions options;
orc::WriterMetrics metrics;
options.setStripeSize(stripeSize);
options.setCompressionBlockSize(blockSize);
options.setCompression(compression);
options.setTimezoneName(timezoneName);
options.setWriterMetrics(showMetrics ? &metrics : nullptr);
std::unique_ptr<orc::OutputStream> outStream = orc::writeLocalFile(output);
std::unique_ptr<orc::Writer> writer = orc::createWriter(*fileType, outStream.get(), options);
std::unique_ptr<orc::ColumnVectorBatch> rowBatch = writer->createRowBatch(batchSize);
bool eof = false;
std::string line;
std::vector<std::string> data; // buffer that holds a batch of rows in raw text
std::ifstream finput(input.c_str());
while (!eof) {
uint64_t numValues = 0; // num of lines read in a batch
data.clear();
memset(rowBatch->notNull.data(), 1, batchSize);
// read a batch of lines from the input file
for (uint64_t i = 0; i < batchSize; ++i) {
if (!std::getline(finput, line)) {
eof = true;
break;
}
data.push_back(line);
++numValues;
}
if (numValues != 0) {
orc::StructVectorBatch* structBatch = dynamic_cast<orc::StructVectorBatch*>(rowBatch.get());
structBatch->numElements = numValues;
for (uint64_t i = 0; i < structBatch->fields.size(); ++i) {
const orc::Type* subType = fileType->getSubtype(i);
switch (subType->getKind()) {
case orc::BYTE:
case orc::INT:
case orc::SHORT:
case orc::LONG:
fillLongValues(data, structBatch->fields[i], numValues, i);
break;
case orc::STRING:
case orc::CHAR:
case orc::VARCHAR:
case orc::BINARY:
bufferList.emplace_back(*orc::getDefaultPool(), 1 * 1024 * 1024);
fillStringValues(data, structBatch->fields[i], numValues, i, bufferList.back());
break;
case orc::FLOAT:
case orc::DOUBLE:
fillDoubleValues(data, structBatch->fields[i], numValues, i);
break;
case orc::DECIMAL:
fillDecimalValues(data, structBatch->fields[i], numValues, i, subType->getScale(),
subType->getPrecision());
break;
case orc::BOOLEAN:
fillBoolValues(data, structBatch->fields[i], numValues, i);
break;
case orc::DATE:
fillDateValues(data, structBatch->fields[i], numValues, i);
break;
case orc::TIMESTAMP:
case orc::TIMESTAMP_INSTANT:
fillTimestampValues(data, structBatch->fields[i], numValues, i);
break;
case orc::STRUCT:
case orc::LIST:
case orc::MAP:
case orc::UNION:
throw std::runtime_error(subType->toString() + " is not supported yet.");
}
}
struct timeval t_start, t_end;
gettimeofday(&t_start, nullptr);
clock_t c_start = clock();
writer->add(*rowBatch);
totalCPUTime += (clock() - c_start);
gettimeofday(&t_end, nullptr);
totalElapsedTime += (static_cast<double>(t_end.tv_sec - t_start.tv_sec) * 1000000.0 +
static_cast<double>(t_end.tv_usec - t_start.tv_usec)) /
1000000.0;
}
}
struct timeval t_start, t_end;
gettimeofday(&t_start, nullptr);
clock_t c_start = clock();
writer->close();
totalCPUTime += (clock() - c_start);
gettimeofday(&t_end, nullptr);
totalElapsedTime += (static_cast<double>(t_end.tv_sec - t_start.tv_sec) * 1000000.0 +
static_cast<double>(t_end.tv_usec - t_start.tv_usec)) /
1000000.0;
std::cout << GetDate() << " Finish importing Orc file." << std::endl;
std::cout << GetDate() << " Total writer elasped time: " << totalElapsedTime << "s." << std::endl;
std::cout << GetDate()
<< " Total writer CPU time: " << static_cast<double>(totalCPUTime) / CLOCKS_PER_SEC
<< "s." << std::endl;
if (showMetrics) {
std::cout << GetDate() << " IO block lantency: "
<< static_cast<double>(metrics.IOBlockingLatencyUs) / 1000000.0 << "s." << std::endl;
std::cout << GetDate() << " IO count: " << metrics.IOCount << std::endl;
}
return 0;
}