in cpp/src/graphar/filesystem.cc [217:263]
Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
FileType file_type,
const std::string& path) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
case FileType::CSV: {
auto write_options = arrow::csv::WriteOptions::Defaults();
write_options.include_header = true;
write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
table->schema(), write_options));
RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
}
case FileType::PARQUET: {
auto schema = table->schema();
auto column_num = schema->num_fields();
parquet::WriterProperties::Builder builder;
builder.compression(arrow::Compression::type::ZSTD); // enable compression
RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
builder.build(), parquet::default_arrow_writer_properties()));
break;
}
#ifdef ARROW_ORC
case FileType::ORC: {
auto writer_options = arrow::adapters::orc::WriteOptions();
writer_options.compression = arrow::Compression::type::ZSTD;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto writer, arrow::adapters::orc::ORCFileWriter::Open(
output_stream.get(), writer_options));
RETURN_NOT_ARROW_OK(writer->Write(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
}
#endif
default:
return Status::Invalid(
"Unsupported file type: ", FileTypeToString(file_type), " for wrting.");
}
return Status::OK();
}