analytical_engine/core/grape_instance.h (172 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANALYTICAL_ENGINE_CORE_GRAPE_INSTANCE_H_ #define ANALYTICAL_ENGINE_CORE_GRAPE_INSTANCE_H_ #include <glog/logging.h> #include <mpi.h> #include <iosfwd> #include <map> #include <memory> #include <string> #include <utility> #include "boost/foreach.hpp" #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "boost/property_tree/json_parser.hpp" #include "boost/property_tree/ptree.hpp" #include "grape/communication/sync_comm.h" #include "grape/config.h" #include "grape/worker/comm_spec.h" #include "vineyard/graph/utils/grape_utils.h" #include "core/object/object_manager.h" #include "core/server/dispatcher.h" #include "core/server/rpc_utils.h" #include "proto/types.pb.h" namespace bl = boost::leaf; namespace grape { class InArchive; } // namespace grape namespace vineyard { class Client; } // namespace vineyard namespace gs { class IContextWrapper; namespace rpc { namespace graph { class GraphDefPb; } // namespace graph } // namespace rpc struct CommandDetail; /** * @brief EngineConfig contains configurations about the analytical engine, such * as networkx features in enabled or not, vineyard socket, and vineyard rpc * endpoint. */ struct EngineConfig { std::string networkx; std::string vineyard_socket; std::string vineyard_rpc_endpoint; std::string enable_java_sdk; std::string ToJsonString() const { boost::property_tree::ptree pt; pt.put("networkx", networkx); pt.put("vineyard_socket", vineyard_socket); pt.put("vineyard_rpc_endpoint", vineyard_rpc_endpoint); pt.put("enable_java_sdk", enable_java_sdk); std::stringstream ss; boost::property_tree::json_parser::write_json(ss, pt); return ss.str(); } }; /** @brief MPI management. * * This controller initials MPI communication world, assign a rank to each * process. According to the assigned rank, determines whether this process runs * as a coordinator or a worker. It also in charges of execute commands from * coordinator. */ class GrapeInstance : public Subscriber { public: explicit GrapeInstance(const grape::CommSpec& comm_spec); void Init(const std::string& vineyard_socket); bl::result<std::shared_ptr<DispatchResult>> OnReceive( std::shared_ptr<CommandDetail> cmd) override; private: bl::result<rpc::graph::GraphDefPb> loadGraph(const rpc::GSParams& params); bl::result<void> unloadGraph(const rpc::GSParams& params); bl::result<void> archiveGraph(const rpc::GSParams& params); bl::result<std::string> loadApp(const rpc::GSParams& params); bl::result<void> unloadApp(const rpc::GSParams& params); bl::result<std::string> query(const rpc::GSParams& params, const rpc::QueryArgs& query_args); bl::result<void> unloadContext(const rpc::GSParams& params); bl::result<std::shared_ptr<grape::InArchive>> reportGraph( const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> projectGraph(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> projectToSimple( const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> modifyVertices( const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> modifyEdges(const rpc::GSParams& params); bl::result<void> clearEdges(const rpc::GSParams& params); bl::result<void> clearGraph(const rpc::GSParams& params); bl::result<std::shared_ptr<grape::InArchive>> contextToNumpy( const rpc::GSParams& params); bl::result<std::shared_ptr<grape::InArchive>> contextToDataframe( const rpc::GSParams& params); bl::result<std::string> contextToVineyardTensor(const rpc::GSParams& params); bl::result<std::string> contextToVineyardDataFrame( const rpc::GSParams& params); bl::result<void> outputContext(const rpc::GSParams& params); bl::result<std::string> output(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> addColumn(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> convertGraph(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> copyGraph(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> toDirected(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> toUnDirected(const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> createGraphView( const rpc::GSParams& params); #ifdef NETWORKX bl::result<rpc::graph::GraphDefPb> induceSubGraph( const rpc::GSParams& params); #endif // NETWORKX bl::result<rpc::graph::GraphDefPb> addLabelsToGraph( const rpc::GSParams& params); bl::result<rpc::graph::GraphDefPb> consolidateColumns( const rpc::GSParams& params); bl::result<std::string> getContextData(const rpc::GSParams& params); bl::result<std::shared_ptr<grape::InArchive>> graphToNumpy( const rpc::GSParams& params); bl::result<std::shared_ptr<grape::InArchive>> graphToDataframe( const rpc::GSParams& params); bl::result<void> registerGraphType(const rpc::GSParams& params); bl::result<void> getContextDetails( const rpc::GSParams& params, std::string* s_selector, std::pair<std::string, std::string>* range, std::shared_ptr<IContextWrapper>* wrapper) { if (params.HasKey(rpc::SELECTOR)) { BOOST_LEAF_ASSIGN(*s_selector, params.Get<std::string>(rpc::SELECTOR)); } if (params.HasKey(rpc::VERTEX_RANGE)) { BOOST_LEAF_AUTO(range_in_json, params.Get<std::string>(rpc::VERTEX_RANGE)); *range = parseRange(range_in_json); } BOOST_LEAF_AUTO(context_key, params.Get<std::string>(rpc::CONTEXT_KEY)); BOOST_LEAF_ASSIGN(*wrapper, object_manager_.GetObject<IContextWrapper>(context_key)); return {}; } static std::string toJson(const std::map<std::string, std::string>& map) { boost::property_tree::ptree pt; for (auto& e : map) { pt.put(e.first, e.second); } std::stringstream ss; boost::property_tree::json_parser::write_json(ss, pt); return ss.str(); } static std::pair<std::string, std::string> parseRange( const std::string& range) { // format: "{begin: a, end: b}" or "{begin: a}" or "{end: b}" or "{}" std::stringstream ss(range); boost::property_tree::ptree pt; std::string begin; std::string end; try { boost::property_tree::json_parser::read_json(ss, pt); BOOST_FOREACH // NOLINT(whitespace/parens) (boost::property_tree::ptree::value_type & v, pt) { CHECK(v.second.empty()); if (v.first == "begin") { begin = v.second.data(); } if (v.first == "end") { end = v.second.data(); } } } catch (boost::property_tree::ptree_error& e) { begin = ""; end = ""; } return std::make_pair(begin, end); } std::string generateId() { std::string id; if (comm_spec_.worker_id() == grape::kCoordinatorRank) { id = vineyard::random_string(8); } grape::sync_comm::Bcast(id, grape::kCoordinatorRank, MPI_COMM_WORLD); return id; } grape::CommSpec comm_spec_; ObjectManager object_manager_; std::shared_ptr<vineyard::Client> client_; }; } // namespace gs #endif // ANALYTICAL_ENGINE_CORE_GRAPE_INSTANCE_H_