analytical_engine/core/io/property_parser.h (280 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_CORE_IO_PROPERTY_PARSER_H_ #define ANALYTICAL_ENGINE_CORE_IO_PROPERTY_PARSER_H_ #include <glog/logging.h> #include <cstddef> #include <iostream> #include <map> #include <memory> #include <string> #include <utility> #include <vector> #include "arrow/buffer.h" #include "arrow/table.h" #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "vineyard/basic/ds/arrow_utils.h" #include "vineyard/common/util/status.h" #include "core/server/rpc_utils.h" #include "proto/attr_value.pb.h" #include "proto/types.pb.h" namespace bl = boost::leaf; namespace gs { namespace rpc { class OpDef; } } // namespace gs template <typename Key, typename Value> using ProtobufMap = ::google::protobuf::Map<Key, Value>; using gs::rpc::AttrValue; using gs::rpc::Chunk; using gs::rpc::DataType; using gs::rpc::GSParams; using gs::rpc::LargeAttrValue; using gs::rpc::OpDef; using AttrMap = ProtobufMap<int, AttrValue>; namespace gs { namespace detail { /** * @brief This is the model class to represent how to load vertex data from the * data source. */ struct Vertex { std::string label; // This field is used to set metadata of arrow table std::string vid; // when vid is single digit, it means column index of vertex // id. Otherwise, it represents column name std::string protocol; // file/oss/numpy/pandas/vineyard std::string values; // from location, vineyard or pandas std::string vformat; // defines vertex format, std::string SerializeToString() const { std::stringstream ss; ss << "V "; ss << label << " " << vid << " "; ss << protocol << " " << values << " "; ss << vformat << "\n"; return ss.str(); } }; /** * @brief This is the model class to represent how to load edge data from the * data source. */ class Edge { public: class SubLabel { public: std::string src_label, dst_label; std::string src_vid, dst_vid; std::string load_strategy; std::string protocol; std::string values; // This doesn't need to be guarded by eformat // eformat is not requires, initialized to empty. std::string eformat; std::string SerializeToString() const { std::stringstream ss; ss << src_label << " " << dst_label << " "; ss << src_vid << " " << dst_vid << " "; ss << protocol << " " << values << " "; ss << eformat; return ss.str(); } }; std::string label; std::vector<SubLabel> sub_labels; std::string SerializeToString() const { std::stringstream ss; ss << "E "; ss << label; for (auto& sub_label : sub_labels) { ss << sub_label.SerializeToString() << ";"; } ss << "\n"; return ss.str(); } }; /** * @brief This is the model class to represent the data source to load a graph */ struct Graph { std::vector<std::shared_ptr<Vertex>> vertices; std::vector<std::shared_ptr<Edge>> edges; bool directed = true; bool generate_eid = true; bool retain_oid = true; bool compact_edges = false; bool use_perfect_hash = false; // This is used to extend the label data // when user try to add data to existed labels. // the available option is 0/1/2, // 0 stands for no extend, // 1 stands for extend vertex label data, // 2 stands for extend edge label data. int extend_type = 0; std::string SerializeToString() const { std::stringstream ss; ss << "directed: " << directed << "\n"; ss << "generate_eid: " << generate_eid << "\n"; ss << "retain_oid: " << retain_oid << "\n"; ss << "compact_edges: " << compact_edges << "\n"; ss << "use_perfect_hash: " << use_perfect_hash << "\n"; for (auto& v : vertices) { ss << v->SerializeToString(); } for (auto& e : edges) { ss << e->SerializeToString(); } return ss.str(); } }; } // namespace detail inline void ParseVertex(std::shared_ptr<detail::Graph>& graph, const std::string& data, const AttrMap& attrs) { auto vertex = std::make_shared<detail::Vertex>(); vertex->label = attrs.at(rpc::LABEL).s(); vertex->vid = attrs.at(rpc::VID).s(); vertex->protocol = attrs.at(rpc::PROTOCOL).s(); if (attrs.find(rpc::VFORMAT) != attrs.end()) { vertex->vformat = attrs.at(rpc::VFORMAT).s(); } if (vertex->protocol == "pandas") { vertex->values = data; } else { vertex->values = attrs.at(rpc::SOURCE).s(); } graph->vertices.push_back(vertex); } inline void ParseEdge(std::shared_ptr<detail::Graph>& graph, const std::string& data, const AttrMap& attrs) { std::string label = attrs.at(rpc::LABEL).s(); bool has_edge_label = false; if (!graph->edges.empty() && graph->edges.back()->label == label) { has_edge_label = true; } auto edge = has_edge_label ? graph->edges.back() : std::make_shared<detail::Edge>(); edge->label = label; // sub_label: src_label / dst_label detail::Edge::SubLabel sub_label; sub_label.src_label = attrs.at(rpc::SRC_LABEL).s(); sub_label.dst_label = attrs.at(rpc::DST_LABEL).s(); sub_label.src_vid = attrs.at(rpc::SRC_VID).s(); sub_label.dst_vid = attrs.at(rpc::DST_VID).s(); sub_label.load_strategy = attrs.at(rpc::LOAD_STRATEGY).s(); sub_label.protocol = attrs.at(rpc::PROTOCOL).s(); if (attrs.find(rpc::EFORMAT) != attrs.end()) { sub_label.eformat = attrs.at(rpc::EFORMAT).s(); } if (sub_label.protocol == "pandas") { sub_label.values = data; } else { sub_label.values = attrs.at(rpc::SOURCE).s(); } edge->sub_labels.push_back(sub_label); if (!has_edge_label) { graph->edges.push_back(edge); } } // The input string is the serialized bytes of an arrow::Table, this function // split the table to several small tables. inline void SplitTable(const std::string& data, int num, std::vector<std::string>& sliced_bytes) { sliced_bytes.resize(num); std::shared_ptr<arrow::Buffer> buffer = arrow::Buffer::Wrap(data.data(), data.size()); std::shared_ptr<arrow::Table> table; VINEYARD_CHECK_OK(vineyard::DeserializeTable(buffer, &table)); std::vector<std::shared_ptr<arrow::Table>> sliced_tables(num); int num_rows = table->num_rows(); int offset = num_rows / num; int remainder = num_rows % num; int cur = 0; sliced_tables[0] = table->Slice(cur, offset + remainder); cur = offset + remainder; for (int i = 1; i < num; ++i) { auto sliced = table->Slice(cur, offset); sliced_tables[i] = sliced; cur += offset; } for (int i = 0; i < num; ++i) { if (sliced_tables[i]->num_rows() > 0) { std::shared_ptr<arrow::Buffer> out_buf; VINEYARD_CHECK_OK(vineyard::SerializeTable(sliced_tables[i], &out_buf)); sliced_bytes[i] = out_buf->ToString(); } } } inline void DistributeChunk(const rpc::Chunk& chunk, int num, std::vector<rpc::Chunk>& distributed_chunk) { distributed_chunk.resize(num); // Copy to a map to avoid the undefined reference issue (inside // protobuf's internal code: Map::at()') on MacOS std::map<int, rpc::AttrValue> params; for (auto& pair : chunk.attr()) { params[pair.first] = pair.second; } std::string protocol = params.at(rpc::PROTOCOL).s(); std::vector<std::string> distributed_values; const std::string& data = chunk.buffer(); if (protocol == "pandas") { SplitTable(data, num, distributed_values); } else { distributed_values.resize(num, params.at(rpc::SOURCE).s()); } for (int i = 0; i < num; ++i) { distributed_chunk[i].set_buffer(std::move(distributed_values[i])); auto* attr = distributed_chunk[i].mutable_attr(); for (auto& pair : params) { (*attr)[pair.first].CopyFrom(pair.second); } } } // If contains contents from numpy or pandas, then we should distribute those // raw bytes evenly across all workers, each worker would only receive a slice, // in order to reduce the communication overhead. inline std::vector<rpc::LargeAttrValue> DistributeGraph( const rpc::LargeAttrValue& large_attr, int num) { std::vector<rpc::LargeAttrValue> distributed_graph(num); if (large_attr.has_chunk_list()) { size_t chunk_list_size = large_attr.chunk_list().items().size(); std::vector<std::vector<rpc::Chunk>> distributed_vec(chunk_list_size); // split for (size_t i = 0; i < chunk_list_size; ++i) { DistributeChunk(large_attr.chunk_list().items(i), num, distributed_vec[i]); } for (int i = 0; i < num; ++i) { for (auto& vec : distributed_vec) { rpc::Chunk* chunk = distributed_graph[i].mutable_chunk_list()->add_items(); chunk->Swap(&vec[i]); } } } return distributed_graph; } inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph( const GSParams& params) { BOOST_LEAF_AUTO(directed, params.Get<bool>(rpc::DIRECTED)); BOOST_LEAF_AUTO(generate_eid, params.Get<bool>(rpc::GENERATE_EID)); BOOST_LEAF_AUTO(retain_oid, params.Get<bool>(rpc::RETAIN_OID)); BOOST_LEAF_AUTO(compact_edges, params.Get<bool>(rpc::COMPACT_EDGES, false)); BOOST_LEAF_AUTO(use_perfect_hash, params.Get<bool>(rpc::USE_PERFECT_HASH, false)); BOOST_LEAF_AUTO(extend_type, params.Get<int64_t>(rpc::EXTEND_LABEL_DATA, 0)); auto graph = std::make_shared<detail::Graph>(); graph->directed = directed; graph->generate_eid = generate_eid; graph->retain_oid = retain_oid; graph->compact_edges = compact_edges; graph->use_perfect_hash = use_perfect_hash; graph->extend_type = extend_type; const auto& large_attr = params.GetLargeAttr(); for (const auto& item : large_attr.chunk_list().items()) { const auto& chunk_attr = item.attr(); if (chunk_attr.at(rpc::CHUNK_NAME).s() == "vertex") { ParseVertex(graph, item.buffer(), chunk_attr); } else if (chunk_attr.at(rpc::CHUNK_NAME).s() == "edge") { ParseEdge(graph, item.buffer(), chunk_attr); } } return graph; } inline bl::result<std::vector<std::map<int, std::vector<int>>>> ParseProjectPropertyGraph(const GSParams& params) { BOOST_LEAF_AUTO(list, params.Get<rpc::AttrValue_ListValue>( rpc::ARROW_PROPERTY_DEFINITION)); auto& items = list.func(); std::map<int, std::vector<int>> vertices, edges; CHECK_EQ(items.size(), 2); { auto item = items[0]; for (auto& pair : item.attr()) { auto props = pair.second.list().i(); vertices[pair.first] = {props.begin(), props.end()}; } } { auto item = items[1]; for (auto& pair : item.attr()) { auto props = pair.second.list().i(); edges[pair.first] = {props.begin(), props.end()}; } } std::vector<std::map<int, std::vector<int>>> res; res.push_back(vertices); res.push_back(edges); return res; } } // namespace gs #endif // ANALYTICAL_ENGINE_CORE_IO_PROPERTY_PARSER_H_