analytical_engine/core/loader/arrow_fragment_loader.h (573 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_CORE_LOADER_ARROW_FRAGMENT_LOADER_H_ #define ANALYTICAL_ENGINE_CORE_LOADER_ARROW_FRAGMENT_LOADER_H_ #ifdef ENABLE_JAVA_SDK #include <jni.h> #endif #include <algorithm> #include <map> #include <memory> #include <set> #include <sstream> #include <string> #include <unordered_map> #include <utility> #include <vector> #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "grape/worker/comm_spec.h" #include "vineyard/basic/ds/arrow_utils.h" #include "vineyard/client/client.h" #include "vineyard/common/util/functions.h" #include "vineyard/graph/loader/arrow_fragment_loader.h" #include "vineyard/graph/loader/fragment_loader_utils.h" #include "vineyard/io/io/i_io_adaptor.h" #include "vineyard/io/io/io_factory.h" #include "core/error.h" #include "core/io/property_parser.h" #ifdef ENABLE_JAVA_SDK #include "core/java/java_loader_invoker.h" #endif #define HASH_PARTITION namespace bl = boost::leaf; namespace gs { /** * @brief This loader can load a ArrowFragment from the data source including * local file, oss, numpy, pandas and vineyard. * @tparam OID_T OID type * @tparam VID_T VID type */ template < typename OID_T = vineyard::property_graph_types::OID_TYPE, typename VID_T = vineyard::property_graph_types::VID_TYPE, template <typename OID_T_ = typename vineyard::InternalType<OID_T>::type, typename VID_T_ = VID_T> class VERTEX_MAP_T = vineyard::ArrowVertexMap> class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader<OID_T, VID_T> { using Base = vineyard::ArrowFragmentLoader<OID_T, VID_T>; using typename Base::internal_oid_t; using typename Base::label_id_t; using typename Base::oid_array_t; using typename Base::oid_t; using typename Base::vid_t; using vertex_map_t = VERTEX_MAP_T<internal_oid_t, VID_T>; using Base::CONSOLIDATE_TAG; using Base::DST_LABEL_TAG; using Base::LABEL_TAG; using Base::MARKER; using Base::SRC_LABEL_TAG; using Base::id_column; using typename Base::partitioner_t; using typename Base::edge_table_info_t; using typename Base::oid_array_vec_t; using typename Base::vertex_table_info_t; using typename Base::vid_array_vec_t; // not sure why 'using' doesn't work for table_vec_t. using table_vec_t = std::vector<std::shared_ptr<arrow::Table>>; using Base::client_; using Base::comm_spec_; using Base::directed_; using Base::efiles_; using Base::generate_eid_; using Base::retain_oid_; using Base::vfiles_; public: ArrowFragmentLoader(vineyard::Client& client, const grape::CommSpec& comm_spec, const std::vector<std::string>& efiles, const std::vector<std::string>& vfiles, bool directed = true, bool generate_eid = false, bool retain_oid = false, bool compact_edges = false, bool use_perfect_hash = false) : Base(client, comm_spec, efiles, vfiles, directed, generate_eid, retain_oid, vineyard::is_local_vertex_map<vertex_map_t>::value, compact_edges, use_perfect_hash), graph_info_(nullptr), giraph_enabled_(false) {} ArrowFragmentLoader(vineyard::Client& client, const grape::CommSpec& comm_spec, std::shared_ptr<detail::Graph> graph_info) : Base(client, comm_spec, std::vector<std::string>{}, std::vector<std::string>{}, graph_info->directed, graph_info->generate_eid, graph_info->retain_oid, vineyard::is_local_vertex_map<vertex_map_t>::value, graph_info->compact_edges, graph_info->use_perfect_hash), graph_info_(graph_info) { #ifdef ENABLE_JAVA_SDK // check when vformat or eformat start with giraph. if not, we // giraph_enabled is false; giraph_enabled_ = false; for (auto v : graph_info_->vertices) { if (v->vformat.find("giraph") != std::string::npos) { giraph_enabled_ = true; } } for (auto e : graph_info_->edges) { for (auto sub : e->sub_labels) { if (sub.eformat.find("giraph") != std::string::npos) { giraph_enabled_ = true; } } } LOG(INFO) << "giraph enabled " << giraph_enabled_; if (giraph_enabled_) { java_loader_invoker_.SetWorkerInfo(comm_spec_.worker_id(), comm_spec_.worker_num(), comm_spec_); java_loader_invoker_.InitJavaLoader("giraph"); } #endif } ~ArrowFragmentLoader() = default; #ifdef ENABLE_JAVA_SDK JavaLoaderInvoker& GetJavaLoaderInvoker() { return java_loader_invoker_; } #endif boost::leaf::result<std::pair<table_vec_t, std::vector<table_vec_t>>> LoadVertexEdgeTables() { if (graph_info_) { std::stringstream labels; labels << "Loading "; if (graph_info_->vertices.empty() && graph_info_->edges.empty()) { labels << "empty graph"; } else { for (size_t i = 0; i < graph_info_->vertices.size(); ++i) { if (i == 0) { labels << "vertex labeled "; // prefix } else { labels << ", "; // label separator } labels << graph_info_->vertices[i]->label; } if (!graph_info_->vertices.empty()) { labels << " and "; } for (size_t i = 0; i < graph_info_->edges.size(); ++i) { if (i == 0) { labels << "edge labeled "; // prefix } else { labels << ", "; // label separator } labels << graph_info_->edges[i]->label; } } LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "DESCRIPTION-" << labels.str(); } BOOST_LEAF_AUTO(v_tables, LoadVertexTables()); BOOST_LEAF_AUTO(e_tables, LoadEdgeTables()); return std::make_pair(v_tables, e_tables); } bl::result<table_vec_t> LoadVertexTables() { LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "READ-VERTEX-0"; table_vec_t v_tables; if (!vfiles_.empty()) { auto load_v_procedure = [&]() { return loadVertexTables(vfiles_, comm_spec_.worker_id(), comm_spec_.worker_num()); }; BOOST_LEAF_ASSIGN(v_tables, vineyard::sync_gs_error(comm_spec_, load_v_procedure)); } else if (graph_info_) { auto load_v_procedure = [&]() { return loadVertexTables(graph_info_->vertices, comm_spec_.worker_id(), comm_spec_.worker_num()); }; BOOST_LEAF_ASSIGN(v_tables, vineyard::sync_gs_error(comm_spec_, load_v_procedure)); } for (const auto& table : v_tables) { BOOST_LEAF_CHECK(this->sanityChecks(table)); } LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "READ-VERTEX-100"; return v_tables; } bl::result<std::vector<table_vec_t>> LoadEdgeTables() { LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "READ-EDGE-0"; std::vector<table_vec_t> e_tables; if (!efiles_.empty()) { auto load_e_procedure = [&]() { return loadEdgeTables(efiles_, comm_spec_.worker_id(), comm_spec_.worker_num()); }; BOOST_LEAF_ASSIGN(e_tables, vineyard::sync_gs_error(comm_spec_, load_e_procedure)); } else if (graph_info_) { auto load_e_procedure = [&]() { return loadEdgeTables(graph_info_->edges, comm_spec_.worker_id(), comm_spec_.worker_num()); }; BOOST_LEAF_ASSIGN(e_tables, vineyard::sync_gs_error(comm_spec_, load_e_procedure)); } for (const auto& table_vec : e_tables) { for (const auto& table : table_vec) { BOOST_LEAF_CHECK(sanityChecks(table)); } } LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "READ-EDGE-100"; return e_tables; } boost::leaf::result<vineyard::ObjectID> LoadFragment() { BOOST_LEAF_CHECK(initPartitioner()); BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); return Base::LoadFragment(std::move(raw_v_e_tables)); } boost::leaf::result<vineyard::ObjectID> LoadFragmentAsFragmentGroup() { BOOST_LEAF_AUTO(frag_id, LoadFragment()); auto frag = std::dynamic_pointer_cast<vineyard::ArrowFragmentBase>( client_.GetObject(frag_id)); if (frag == nullptr) { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "fragment is null, means it is failed to be constructed"); } BOOST_LEAF_AUTO(group_id, ConstructFragmentGroup(client_, frag_id, comm_spec_)); return group_id; } bl::result<vineyard::ObjectID> AddLabelsToFragment( vineyard::ObjectID frag_id) { BOOST_LEAF_CHECK(initPartitioner()); BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables)); } bl::result<vineyard::ObjectID> AddDataToExistedVLabel( vineyard::ObjectID frag_id, label_id_t label_id) { BOOST_LEAF_CHECK(initPartitioner()); BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); return Base::addDataToExistedVLabel(frag_id, label_id, std::move(raw_v_e_tables)); } bl::result<vineyard::ObjectID> AddDataToExistedELabel( vineyard::ObjectID frag_id, label_id_t label_id) { BOOST_LEAF_CHECK(initPartitioner()); BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); return Base::addDataToExistedELabel(frag_id, label_id, std::move(raw_v_e_tables)); } boost::leaf::result<vineyard::ObjectID> AddLabelsToFragmentAsFragmentGroup( vineyard::ObjectID frag_id) { BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id)); VY_OK_OR_RAISE(client_.Persist(new_frag_id)); return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); } bl::result<vineyard::ObjectID> ExtendLabelData(vineyard::ObjectID frag_id, int extend_type) { // find duplicate label id assert(extend_type); auto frag = std::dynamic_pointer_cast<vineyard::ArrowFragmentBase>( client_.GetObject(frag_id)); vineyard::PropertyGraphSchema schema = frag->schema(); std::vector<std::string> labels; label_id_t target_label_id = -1; if (extend_type == 1) labels = schema.GetVertexLabels(); else if (extend_type == 2) labels = schema.GetEdgeLabels(); std::map<std::string, label_id_t> label_set; for (size_t i = 0; i < labels.size(); ++i) label_set[labels[i]] = i; if (extend_type == 1) { for (size_t i = 0; i < graph_info_->vertices.size(); ++i) { auto it = label_set.find(graph_info_->vertices[i]->label); if (it != label_set.end()) { target_label_id = it->second; break; } } } else if (extend_type == 2) { for (size_t i = 0; i < graph_info_->edges.size(); ++i) { auto it = label_set.find(graph_info_->edges[i]->label); if (it != label_set.end()) { target_label_id = it->second; break; } } } else { RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "extend type is invalid"); } if (target_label_id == -1) RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "label not found"); vineyard::ObjectID new_frag_id; if (extend_type == 1) { BOOST_LEAF_ASSIGN(new_frag_id, AddDataToExistedVLabel(frag_id, target_label_id)); } else if (extend_type == 2) { BOOST_LEAF_ASSIGN(new_frag_id, AddDataToExistedELabel(frag_id, target_label_id)); } return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); } bl::result<void> initPartitioner() { #ifdef HASH_PARTITION Base::partitioner_.Init(comm_spec_.fnum()); #else if (vfiles_.empty() && (graph_info_ != nullptr && graph_info_->vertices.empty())) { RETURN_GS_ERROR( vineyard::ErrorCode::kInvalidOperationError, "Segmented partitioner is not supported when the v-file is " "not provided"); } table_vec_t vtables; if (graph_info_) { BOOST_LEAF_ASSIGN(vtables, loadVertexTables(graph_info_->vertices, 0, 1)); } else { BOOST_LEAF_ASSIGN(vtables, loadVertexTables(vfiles_, 0, 1)); } std::vector<oid_t> oid_list; for (auto& table : vtables) { std::shared_ptr<arrow::ChunkedArray> oid_array_chunks = table->column(id_column); size_t chunk_num = oid_array_chunks->num_chunks(); for (size_t chunk_i = 0; chunk_i != chunk_num; ++chunk_i) { std::shared_ptr<oid_array_t> array = std::dynamic_pointer_cast<oid_array_t>( oid_array_chunks->chunk(chunk_i)); int64_t length = array->length(); for (int64_t i = 0; i < length; ++i) { oid_list.emplace_back(oid_t(array->GetView(i))); } } } Base::partitioner_.Init(comm_spec_.fnum(), oid_list); #endif return {}; } private: using Base::loadEdgeTables; using Base::loadVertexTables; using Base::preprocessInputs; using Base::resolveVineyardObject; using Base::sanityChecks; #ifdef ENABLE_JAVA_SDK // Location like giraph://filename#input_format_class=className bl::result<std::shared_ptr<arrow::Table>> readTableFromGiraph( bool load_vertex, const std::string& file_path, int index, int total_parts, const std::string formatter) { if (giraph_enabled_) { if (load_vertex) { // There are cases both vertex and edges are specified in vertex file. // In this case, we load the data in this function, and suppose call // add_edges will be called(empty location), // if location is empty, we just return the previous loaded data. java_loader_invoker_.load_vertices_and_edges(file_path, formatter); return java_loader_invoker_.get_vertex_table(); } else { java_loader_invoker_.load_edges(file_path, formatter); return java_loader_invoker_.get_edge_table(); } } else { RETURN_GS_ERROR(vineyard::ErrorCode::kIOError, "Please enable giraph in constructor"); } // once set, we will read. } #endif bl::result<table_vec_t> loadVertexTables( const std::vector<std::shared_ptr<detail::Vertex>>& vertices, int index, int total_parts) { // a special code path when multiple labeled vertex batches are mixed: for // subgraph if (vertices.size() == 1 && vertices[0]->protocol == "vineyard") { VLOG(2) << "read vertex table from vineyard: " << vertices[0]->values; BOOST_LEAF_AUTO(sourceId, this->resolveVineyardObject(vertices[0]->values)); auto read_procedure = [&]() -> bl::result<table_vec_t> { BOOST_LEAF_AUTO(tables, vineyard::GatherVTables( client_, {sourceId}, comm_spec_.local_id(), comm_spec_.local_num())); if (tables.size() == 1 && tables[0] != nullptr) { std::shared_ptr<arrow::KeyValueMetadata> meta; if (tables[0]->schema()->metadata() == nullptr) { meta = std::make_shared<arrow::KeyValueMetadata>(); } else { meta = tables[0]->schema()->metadata()->Copy(); } if (meta->FindKey(LABEL_TAG) == -1) { meta->Append(LABEL_TAG, vertices[0]->label); } tables[0] = tables[0]->ReplaceSchemaMetadata(meta); } return tables; }; BOOST_LEAF_AUTO(tables, vineyard::sync_gs_error(comm_spec_, read_procedure)); auto sync_schema_procedure = [&](std::shared_ptr<arrow::Table> const& table) { return vineyard::SyncSchema(table, comm_spec_); }; table_vec_t normalized_tables; for (auto const& table : tables) { BOOST_LEAF_AUTO( normalized_table, vineyard::sync_gs_error(comm_spec_, sync_schema_procedure, table)); normalized_tables.emplace_back(normalized_table); } return normalized_tables; } size_t label_num = vertices.size(); table_vec_t tables(label_num); for (size_t i = 0; i < label_num; ++i) { auto read_procedure = [&]() -> bl::result<std::shared_ptr<arrow::Table>> { std::shared_ptr<arrow::Table> table; if (vertices[i]->protocol == "numpy" || vertices[i]->protocol == "pandas") { VY_OK_OR_RAISE( vineyard::ReadTableFromPandas(vertices[i]->values, table)); } else if (vertices[i]->protocol == "vineyard") { VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values; BOOST_LEAF_AUTO(sourceId, this->resolveVineyardObject(vertices[i]->values)); VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard( client_, sourceId, table, comm_spec_.local_id(), comm_spec_.local_num())); #ifdef ENABLE_JAVA_SDK } else if (vertices[i]->protocol == "file" && vertices[i]->vformat.find("giraph") != std::string::npos) { BOOST_LEAF_ASSIGN( table, readTableFromGiraph( true, vertices[i]->values, index, total_parts, vertices[i]->vformat)); // true means to load vertex. #endif } else { // Let the IOFactory to parse other protocols. auto path = vertices[i]->values; VY_OK_OR_RAISE(vineyard::ReadTableFromLocation( vertices[i]->values, table, index, total_parts)); } return table; }; BOOST_LEAF_AUTO(table, vineyard::sync_gs_error(comm_spec_, read_procedure)); auto sync_schema_procedure = [&]() { return vineyard::SyncSchema(table, comm_spec_); }; BOOST_LEAF_AUTO(normalized_table, vineyard::sync_gs_error( comm_spec_, sync_schema_procedure)); std::shared_ptr<arrow::KeyValueMetadata> meta( new arrow::KeyValueMetadata()); meta->Append(LABEL_TAG, vertices[i]->label); tables[i] = normalized_table->ReplaceSchemaMetadata(meta); } return tables; } bl::result<std::vector<table_vec_t>> loadEdgeTables( const std::vector<std::shared_ptr<detail::Edge>>& edges, int index, int total_parts) { // a special code path when multiple labeled edge batches are mixed. if (edges.size() == 1 && edges[0]->sub_labels.size() == 1 && edges[0]->sub_labels[0].protocol == "vineyard") { LOG(INFO) << "read edge table from vineyard: " << edges[0]->sub_labels[0].values; BOOST_LEAF_AUTO(sourceId, this->resolveVineyardObject( edges[0]->sub_labels[0].values)); auto read_procedure = [&]() -> bl::result<std::vector<table_vec_t>> { BOOST_LEAF_AUTO(tables, vineyard::GatherETables(client_, {{sourceId}}, comm_spec_.local_id(), comm_spec_.local_num())); if (tables.size() == 1 && tables[0].size() == 1 && tables[0][0] != nullptr) { std::shared_ptr<arrow::KeyValueMetadata> meta; if (tables[0][0]->schema()->metadata() == nullptr) { meta = std::make_shared<arrow::KeyValueMetadata>(); } else { meta = tables[0][0]->schema()->metadata()->Copy(); } if (meta->FindKey(LABEL_TAG) == -1 || meta->FindKey(SRC_LABEL_TAG) == -1 || meta->FindKey(DST_LABEL_TAG) == -1) { meta->Append(LABEL_TAG, edges[0]->label); meta->Append(SRC_LABEL_TAG, edges[0]->sub_labels[0].src_label); meta->Append(DST_LABEL_TAG, edges[0]->sub_labels[0].dst_label); } tables[0][0] = tables[0][0]->ReplaceSchemaMetadata(meta); } return tables; }; BOOST_LEAF_AUTO(tables, vineyard::sync_gs_error(comm_spec_, read_procedure)); auto sync_schema_procedure = [&](std::shared_ptr<arrow::Table> const& table) { return vineyard::SyncSchema(table, comm_spec_); }; std::vector<table_vec_t> normalized_tables; for (auto const& subtables : tables) { table_vec_t normalized_subtables; for (auto const& table : subtables) { BOOST_LEAF_AUTO(normalized_table, vineyard::sync_gs_error( comm_spec_, sync_schema_procedure, table)); normalized_subtables.emplace_back(normalized_table); } normalized_tables.emplace_back(normalized_subtables); } return normalized_tables; } size_t edge_type_num = edges.size(); std::vector<table_vec_t> tables(edge_type_num); for (size_t i = 0; i < edge_type_num; ++i) { auto sub_labels = edges[i]->sub_labels; for (size_t j = 0; j < sub_labels.size(); ++j) { std::shared_ptr<arrow::KeyValueMetadata> meta( new arrow::KeyValueMetadata()); meta->Append(LABEL_TAG, edges[i]->label); auto load_procedure = [&]() -> bl::result<std::shared_ptr<arrow::Table>> { std::shared_ptr<arrow::Table> table; if (sub_labels[j].protocol == "pandas") { VY_OK_OR_RAISE( vineyard::ReadTableFromPandas(sub_labels[j].values, table)); } else if (sub_labels[j].protocol == "vineyard") { LOG(INFO) << "read edge table from vineyard: " << sub_labels[j].values; BOOST_LEAF_AUTO(sourceId, this->resolveVineyardObject(sub_labels[j].values)); VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard( client_, sourceId, table, comm_spec_.local_id(), comm_spec_.local_num())); if (table == nullptr) { VLOG(2) << "edge table is null"; } else { VLOG(2) << "schema of edge table: " << table->schema()->ToString(); } #ifdef ENABLE_JAVA_SDK } else if (sub_labels[j].protocol == "file" && sub_labels[j].eformat.find("giraph") != std::string::npos) { BOOST_LEAF_ASSIGN( table, readTableFromGiraph(false, sub_labels[j].values, index, total_parts, sub_labels[j].eformat)); #endif } else { // Let the IOFactory to parse other protocols. VY_OK_OR_RAISE(vineyard::ReadTableFromLocation( sub_labels[j].values, table, index, total_parts)); } return table; }; BOOST_LEAF_AUTO(table, vineyard::sync_gs_error(comm_spec_, load_procedure)); auto sync_schema_procedure = [&]() { return vineyard::SyncSchema(table, comm_spec_); }; BOOST_LEAF_AUTO( normalized_table, vineyard::sync_gs_error(comm_spec_, sync_schema_procedure)); meta->Append(SRC_LABEL_TAG, sub_labels[j].src_label); meta->Append(DST_LABEL_TAG, sub_labels[j].dst_label); tables[i].emplace_back(normalized_table->ReplaceSchemaMetadata(meta)); } } return tables; } std::shared_ptr<detail::Graph> graph_info_; bool giraph_enabled_; #ifdef ENABLE_JAVA_SDK JavaLoaderInvoker java_loader_invoker_; #endif }; namespace detail { template <typename OID_T, typename VID_T, typename VERTEX_MAP_T> struct rebind_arrow_fragment_loader; template <typename OID_T, typename VID_T> struct rebind_arrow_fragment_loader< OID_T, VID_T, vineyard::ArrowVertexMap<typename vineyard::InternalType<OID_T>::type, VID_T>> { using type = ArrowFragmentLoader<OID_T, VID_T, vineyard::ArrowVertexMap>; }; template <typename OID_T, typename VID_T> struct rebind_arrow_fragment_loader< OID_T, VID_T, vineyard::ArrowLocalVertexMap<typename vineyard::InternalType<OID_T>::type, VID_T>> { using type = ArrowFragmentLoader<OID_T, VID_T, vineyard::ArrowLocalVertexMap>; }; } // namespace detail template <typename OID_T, typename VID_T, typename VERTEX_MAP_T> using arrow_fragment_loader_t = typename detail::rebind_arrow_fragment_loader<OID_T, VID_T, VERTEX_MAP_T>::type; } // namespace gs #endif // ANALYTICAL_ENGINE_CORE_LOADER_ARROW_FRAGMENT_LOADER_H_