analytical_engine/frame/project_frame.cc (216 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include "vineyard/basic/ds/arrow_utils.h"
#include "vineyard/common/util/typename.h"
#include "vineyard/graph/fragment/arrow_fragment.h"
#include "vineyard/graph/fragment/graph_schema.h"
#include "vineyard/graph/fragment/property_graph_types.h"
#include "vineyard/graph/utils/grape_utils.h"
#include "core/error.h"
#include "core/fragment/arrow_projected_fragment.h"
#include "core/fragment/dynamic_fragment.h"
#include "core/fragment/dynamic_projected_fragment.h"
#include "core/object/fragment_wrapper.h"
#include "core/server/rpc_utils.h"
#include "core/utils/fragment_traits.h"
#include "proto/attr_value.pb.h"
#if !defined(_PROJECTED_GRAPH_TYPE)
#error "_PROJECTED_GRAPH_TYPE is undefined"
#endif
namespace bl = boost::leaf;
/**
* project_frame.cc serves as a frame to be compiled with
* ArrowProjectedFragment/DynamicProjectedFragment. The frame will be compiled
* when the client issues a PROJECT_TO_SIMPLE request. Then, a library will be
* produced based on the frame. The reason we need the frame is the template
* parameters are unknown before the project request has arrived at the
* analytical engine. A dynamic library is necessary to prevent hardcode data
* type in the engine.
*/
namespace gs {
template <typename FRAG_T>
class ProjectSimpleFrame {};
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T,
typename VERTEX_MAP_T, bool COMPACT>
class ProjectSimpleFrame<gs::ArrowProjectedFragment<
OID_T, VID_T, VDATA_T, EDATA_T, VERTEX_MAP_T, COMPACT>> {
using fragment_t =
vineyard::ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>;
using projected_fragment_t =
gs::ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T, VERTEX_MAP_T,
COMPACT>;
public:
__attribute__((visibility(
"hidden"))) static bl::result<std::shared_ptr<IFragmentWrapper>>
Project(std::shared_ptr<IFragmentWrapper>& input_wrapper,
const std::string& projected_graph_name,
const rpc::GSParams& params) {
auto graph_type = input_wrapper->graph_def().graph_type();
if (graph_type != rpc::graph::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"graph_type should be ARROW_PROPERTY, got " +
rpc::graph::GraphTypePb_Name(graph_type));
}
BOOST_LEAF_AUTO(v_label_id, params.Get<int64_t>(rpc::V_LABEL_ID));
BOOST_LEAF_AUTO(e_label_id, params.Get<int64_t>(rpc::E_LABEL_ID));
BOOST_LEAF_AUTO(v_prop_id, params.Get<int64_t>(rpc::V_PROP_ID));
BOOST_LEAF_AUTO(e_prop_id, params.Get<int64_t>(rpc::E_PROP_ID));
auto input_frag =
std::static_pointer_cast<fragment_t>(input_wrapper->fragment());
auto projected_frag = projected_fragment_t::Project(
input_frag, v_label_id, v_prop_id, e_label_id, e_prop_id);
rpc::graph::GraphDefPb graph_def;
graph_def.set_key(projected_graph_name);
graph_def.set_graph_type(rpc::graph::ARROW_PROJECTED);
graph_def.set_compact_edges(input_frag->compact_edges());
graph_def.set_use_perfect_hash(input_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_vineyard_id(projected_frag->id());
graph_def.mutable_extension()->PackFrom(vy_info);
setGraphDef(projected_frag, v_label_id, e_label_id, v_prop_id, e_prop_id,
graph_def);
auto wrapper = std::make_shared<FragmentWrapper<projected_fragment_t>>(
projected_graph_name, graph_def, projected_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
private:
static void setGraphDef(
std::shared_ptr<projected_fragment_t>& fragment,
vineyard::property_graph_types::LABEL_ID_TYPE const& v_label,
vineyard::property_graph_types::LABEL_ID_TYPE const& e_label,
vineyard::property_graph_types::PROP_ID_TYPE const& v_prop,
vineyard::property_graph_types::PROP_ID_TYPE const& e_prop,
rpc::graph::GraphDefPb& graph_def) {
auto& meta = fragment->meta();
const auto& parent_meta = meta.GetMemberMeta("arrow_fragment");
graph_def.set_directed(parent_meta.template GetKeyValue<bool>("directed_"));
graph_def.set_compact_edges(fragment->compact_edges());
graph_def.set_use_perfect_hash(fragment->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_oid_type(PropertyTypeToPb(
vineyard::normalize_datatype(parent_meta.GetKeyValue("oid_type"))));
vy_info.set_vid_type(PropertyTypeToPb(
vineyard::normalize_datatype(parent_meta.GetKeyValue("vid_type"))));
vineyard::json schema_json;
parent_meta.GetKeyValue("schema_json_", schema_json);
vineyard::PropertyGraphSchema schema(schema_json);
std::string vdata_type, edata_type;
if (v_prop != -1) {
vdata_type =
vineyard::normalize_datatype(vineyard::type_name_from_arrow_type(
schema.GetVertexPropertyType(v_label, v_prop)));
} else {
vdata_type = vineyard::normalize_datatype("empty");
}
vy_info.set_vdata_type(PropertyTypeToPb(vdata_type));
if (e_prop != -1) {
edata_type =
vineyard::normalize_datatype(vineyard::type_name_from_arrow_type(
schema.GetEdgePropertyType(e_label, e_prop)));
} else {
edata_type = vineyard::normalize_datatype("empty");
}
vy_info.set_edata_type(PropertyTypeToPb(edata_type));
vy_info.set_property_schema_json("{}");
graph_def.mutable_extension()->PackFrom(vy_info);
}
};
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {
using fragment_t = vineyard::ArrowFragment<OID_T, VID_T>;
using projected_fragment_t =
gs::ArrowFlattenedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
public:
__attribute__((visibility(
"hidden"))) static bl::result<std::shared_ptr<IFragmentWrapper>>
Project(std::shared_ptr<IFragmentWrapper>& input_wrapper,
const std::string& projected_graph_name,
const rpc::GSParams& params) {
auto graph_type = input_wrapper->graph_def().graph_type();
if (graph_type != rpc::graph::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"graph_type should be ARROW_PROPERTY, got " +
rpc::graph::GraphTypePb_Name(graph_type));
}
BOOST_LEAF_AUTO(v_prop_key, params.Get<std::string>(rpc::V_PROP_KEY));
BOOST_LEAF_AUTO(e_prop_key, params.Get<std::string>(rpc::E_PROP_KEY));
auto input_frag =
std::static_pointer_cast<fragment_t>(input_wrapper->fragment());
auto projected_frag =
projected_fragment_t::Project(input_frag, v_prop_key, e_prop_key);
rpc::graph::GraphDefPb graph_def;
graph_def.set_key(projected_graph_name);
graph_def.set_graph_type(rpc::graph::ARROW_FLATTENED);
graph_def.set_compact_edges(input_frag->compact_edges());
graph_def.set_use_perfect_hash(input_frag->use_perfect_hash());
gs::rpc::graph::VineyardInfoPb vy_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&vy_info);
}
vy_info.set_oid_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::oid_t>())));
vy_info.set_vid_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::vid_t>())));
vy_info.set_vdata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::vdata_t>())));
vy_info.set_edata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::edata_t>())));
graph_def.mutable_extension()->PackFrom(vy_info);
auto wrapper = std::make_shared<FragmentWrapper<projected_fragment_t>>(
projected_graph_name, graph_def, projected_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
};
#ifdef NETWORKX
template <typename VDATA_T, typename EDATA_T>
class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
using fragment_t = DynamicFragment;
using projected_fragment_t = gs::DynamicProjectedFragment<VDATA_T, EDATA_T>;
public:
__attribute__((visibility(
"hidden"))) static bl::result<std::shared_ptr<IFragmentWrapper>>
Project(std::shared_ptr<IFragmentWrapper>& input_wrapper,
const std::string& projected_graph_name,
const rpc::GSParams& params) {
auto graph_type = input_wrapper->graph_def().graph_type();
if (graph_type != rpc::graph::DYNAMIC_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"graph_type should be DYNAMIC_PROPERTY, got " +
rpc::graph::GraphTypePb_Name(graph_type));
}
BOOST_LEAF_AUTO(v_prop_key, params.Get<std::string>(rpc::V_PROP_KEY));
BOOST_LEAF_AUTO(e_prop_key, params.Get<std::string>(rpc::E_PROP_KEY));
auto input_frag =
std::static_pointer_cast<fragment_t>(input_wrapper->fragment());
auto projected_frag =
projected_fragment_t::Project(input_frag, v_prop_key, e_prop_key);
rpc::graph::GraphDefPb graph_def;
graph_def.set_key(projected_graph_name);
graph_def.set_graph_type(rpc::graph::DYNAMIC_PROJECTED);
graph_def.set_compact_edges(false);
graph_def.set_use_perfect_hash(false);
gs::rpc::graph::MutableGraphInfoPb graph_info;
if (graph_def.has_extension()) {
graph_def.extension().UnpackTo(&graph_info);
}
graph_info.set_vdata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::vdata_t>())));
graph_info.set_edata_type(PropertyTypeToPb(vineyard::normalize_datatype(
vineyard::type_name<typename projected_fragment_t::edata_t>())));
graph_def.mutable_extension()->PackFrom(graph_info);
auto wrapper = std::make_shared<FragmentWrapper<projected_fragment_t>>(
projected_graph_name, graph_def, projected_frag);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}
};
#endif
} // namespace gs
extern "C" {
void Project(std::shared_ptr<gs::IFragmentWrapper>& wrapper_in,
const std::string& projected_graph_name,
const gs::rpc::GSParams& params,
bl::result<std::shared_ptr<gs::IFragmentWrapper>>& wrapper_out) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_out, gs::ProjectSimpleFrame<_PROJECTED_GRAPH_TYPE>::Project(
wrapper_in, projected_graph_name, params));
}
template class _PROJECTED_GRAPH_TYPE;
} // extern "C"