cpp/src/graphar/filesystem.cc (310 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifdef ARROW_ORC
#include "arrow/adapters/orc/adapter.h"
#endif
#include "arrow/api.h"
#include "arrow/csv/api.h"
#include "arrow/dataset/api.h"
#if defined(ARROW_VERSION) && ARROW_VERSION <= 12000000
#include "arrow/dataset/file_json.h"
#endif
#include "arrow/filesystem/api.h"
#include "arrow/filesystem/s3fs.h"
#include "arrow/ipc/writer.h"
#include "parquet/arrow/writer.h"
#include "simple-uri-parser/uri_parser.h"
#include "graphar/expression.h"
#include "graphar/filesystem.h"
#include "graphar/fwd.h"
#include "graphar/general_params.h"
namespace graphar::detail {
template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>& out) { // NOLINT(runtime/references)
auto array_data = in->data()->Copy();
auto offset = array_data->buffers[1];
using from_offset_type = typename U::offset_type;
using to_string_offset_type = typename T::offset_type;
auto raw_value_offsets_ =
offset == NULLPTR
? NULLPTR
: reinterpret_cast<const from_offset_type*>(offset->data());
std::vector<to_string_offset_type> to_offset(offset->size() /
sizeof(from_offset_type));
for (size_t i = 0; i < to_offset.size(); ++i) {
to_offset[i] = raw_value_offsets_[i];
}
std::shared_ptr<arrow::Buffer> buffer;
arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
RETURN_NOT_ARROW_OK(
buffer_builder.Append(to_offset.data(), to_offset.size()));
RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
array_data->type = to_type;
array_data->buffers[1] = buffer;
out = arrow::MakeArray(array_data);
RETURN_NOT_ARROW_OK(out->ValidateFull());
return Status::OK();
}
template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::ChunkedArray>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::ChunkedArray>& out) { // NOLINT(runtime/references)
std::vector<std::shared_ptr<arrow::Array>> chunks;
for (auto const& chunk : in->chunks()) {
std::shared_ptr<arrow::Array> array;
auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
GAR_RETURN_NOT_OK(status);
chunks.emplace_back(array);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
return Status::OK();
}
} // namespace graphar::detail
namespace graphar {
namespace ds = arrow::dataset;
std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
const FileType type) const {
switch (type) {
case CSV:
return std::make_shared<ds::CsvFileFormat>();
case PARQUET:
return std::make_shared<ds::ParquetFileFormat>();
case JSON:
return std::make_shared<ds::JsonFileFormat>();
#ifdef ARROW_ORC
case ORC:
return std::make_shared<ds::OrcFileFormat>();
#endif
default:
return nullptr;
}
}
Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
const std::string& path, FileType file_type,
const util::FilterOptions& options) const noexcept {
std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
arrow_fs_, {path}, format,
arrow::dataset::FileSystemFactoryOptions()));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto dataset, factory->Finish());
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scan_builder, dataset->NewScan());
// Apply the row filter and select the specified columns
if (options.filter) {
GAR_ASSIGN_OR_RAISE(auto filter, options.filter->Evaluate());
RETURN_NOT_ARROW_OK(scan_builder->Filter(filter));
}
if (options.columns) {
RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns));
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scanner, scan_builder->Finish());
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto table, scanner->ToTable());
// cast string array to large string array as we need concatenate chunks in
// some places, e.g., in vineyard
for (int i = 0; i < table->num_columns(); ++i) {
std::shared_ptr<arrow::DataType> type = table->column(i)->type();
if (type->id() == arrow::Type::STRING) {
type = arrow::large_utf8();
} else if (type->id() == arrow::Type::BINARY) {
type = arrow::large_binary();
}
if (type->Equals(table->column(i)->type())) {
continue;
}
// do casting
auto field = table->field(i)->WithType(type);
std::shared_ptr<arrow::ChunkedArray> chunked_array;
if (table->num_rows() == 0) {
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
chunked_array, arrow::ChunkedArray::MakeEmpty(type));
} else if (type->Equals(arrow::large_utf8())) {
auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
arrow::LargeStringArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else if (type->Equals(arrow::large_binary())) {
auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
arrow::LargeBinaryArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else {
// noop
chunked_array = table->column(i);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
table, table->AddColumn(i, field, chunked_array));
}
return table;
}
template <typename T>
Result<T> FileSystem::ReadFileToValue(const std::string& path) const noexcept {
T ret;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto input,
arrow_fs_->OpenInputStream(path));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes,
input->Read(sizeof(T), &ret));
ARROW_UNUSED(bytes);
return ret;
}
template <>
Result<std::string> FileSystem::ReadFileToValue(const std::string& path) const
noexcept {
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto access_file,
arrow_fs_->OpenInputFile(path));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto bytes, access_file->GetSize());
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto buffer,
access_file->ReadAt(0, bytes));
return buffer->ToString();
}
template <typename T>
Status FileSystem::WriteValueToFile(const T& value,
const std::string& path) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(&value, sizeof(T)));
RETURN_NOT_ARROW_OK(ofstream->Close());
return Status::OK();
}
template <>
Status FileSystem::WriteValueToFile(const std::string& value,
const std::string& path) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto ofstream,
arrow_fs_->OpenOutputStream(path));
RETURN_NOT_ARROW_OK(ofstream->Write(value.c_str(), value.size()));
RETURN_NOT_ARROW_OK(ofstream->Close());
return Status::OK();
}
Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
FileType file_type,
const std::string& path) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
case FileType::CSV: {
auto write_options = arrow::csv::WriteOptions::Defaults();
write_options.include_header = true;
write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
table->schema(), write_options));
RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
}
case FileType::PARQUET: {
auto schema = table->schema();
auto column_num = schema->num_fields();
parquet::WriterProperties::Builder builder;
builder.compression(arrow::Compression::type::ZSTD); // enable compression
RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
builder.build(), parquet::default_arrow_writer_properties()));
break;
}
#ifdef ARROW_ORC
case FileType::ORC: {
auto writer_options = arrow::adapters::orc::WriteOptions();
writer_options.compression = arrow::Compression::type::ZSTD;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto writer, arrow::adapters::orc::ORCFileWriter::Open(
output_stream.get(), writer_options));
RETURN_NOT_ARROW_OK(writer->Write(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
}
#endif
default:
return Status::Invalid(
"Unsupported file type: ", FileTypeToString(file_type), " for wrting.");
}
return Status::OK();
}
Status FileSystem::WriteLabelTableToFile(
const std::shared_ptr<arrow::Table>& table, const std::string& path) const
noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
auto schema = table->schema();
auto column_num = schema->num_fields();
parquet::WriterProperties::Builder builder;
builder.compression(arrow::Compression::type::ZSTD); // enable compression
builder.encoding(parquet::Encoding::RLE);
RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
builder.build(), parquet::default_arrow_writer_properties()));
return Status::OK();
}
Status FileSystem::CopyFile(const std::string& src_path,
const std::string& dst_path) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(
arrow_fs_->CreateDir(dst_path.substr(0, dst_path.find_last_of("/"))));
RETURN_NOT_ARROW_OK(arrow_fs_->CopyFile(src_path, dst_path));
return Status::OK();
}
Result<IdType> FileSystem::GetFileNumOfDir(const std::string& dir_path,
bool recursive) const noexcept {
arrow::fs::FileSelector file_selector;
file_selector.base_dir = dir_path;
file_selector.allow_not_found = false; // if dir_path not exist, return error
file_selector.recursive = recursive;
arrow::fs::FileInfoVector file_infos;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(file_infos,
arrow_fs_->GetFileInfo(file_selector));
return static_cast<IdType>(file_infos.size());
}
FileSystem::~FileSystem() {}
Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
const std::string& uri_string, std::string* out_path) {
if (uri_string.length() >= 1 && uri_string[0] == '/') {
// if the uri_string is an absolute path, we need to create a local file
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs,
arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
// arrow would delete the last slash, so use uri string
if (out_path != nullptr) {
*out_path = uri_string;
}
return std::make_shared<FileSystem>(arrow_fs);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto arrow_fs, arrow::fs::FileSystemFromUriOrPath(uri_string));
auto uri = uri::parse_uri(uri_string);
if (uri.error != uri::Error::None) {
return Status::Invalid("Failed to parse URI: ", uri_string);
}
if (out_path != nullptr) {
if (uri.scheme == "file" || uri.scheme == "hdfs" || uri.scheme.empty()) {
*out_path = uri.path;
} else if (uri.scheme == "s3" || uri.scheme == "gs") {
// bucket name is the host, path is the path
*out_path = uri.authority.host + uri.path;
} else {
return Status::Invalid("Unrecognized filesystem type in URI: ",
uri_string);
}
}
return std::make_shared<FileSystem>(arrow_fs);
}
// arrow::fs::InitializeS3 and arrow::fs::FinalizeS3 need arrow_version >= 15
Status InitializeS3() {
#if defined(ARROW_VERSION) && ARROW_VERSION > 14000000
auto options = arrow::fs::S3GlobalOptions::Defaults();
#else
arrow::fs::S3GlobalOptions options;
options.log_level = arrow::fs::S3LogLevel::Fatal;
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
RETURN_NOT_ARROW_OK(arrow::fs::InitializeS3(options));
#endif
return Status::OK();
}
Status FinalizeS3() {
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
RETURN_NOT_ARROW_OK(arrow::fs::FinalizeS3());
#endif
return Status::OK();
}
/// template specialization for std::string
template Result<IdType> FileSystem::ReadFileToValue<IdType>(
const std::string&) const noexcept;
/// template specialization for std::string
template Status FileSystem::WriteValueToFile<IdType>(const IdType&,
const std::string&) const
noexcept;
} // namespace graphar