analytical_engine/core/context/vertex_property_context.h (424 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_PROPERTY_CONTEXT_H_
#define ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_PROPERTY_CONTEXT_H_
#include <mpi.h>
#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "boost/leaf/error.hpp"
#include "boost/leaf/result.hpp"
#include "grape/app/context_base.h"
#include "grape/serialization/in_archive.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/basic/ds/dataframe.h"
#include "vineyard/client/client.h"
#include "vineyard/common/util/uuid.h"
#include "vineyard/graph/fragment/property_graph_types.h"
#include "vineyard/graph/utils/context_protocols.h"
#include "core/context/column.h"
#include "core/context/context_protocols.h"
#include "core/context/i_context.h"
#include "core/context/selector.h"
#include "core/context/tensor_dataframe_builder.h"
#include "core/error.h"
#include "core/utils/mpi_utils.h"
#include "core/utils/transform_utils.h"
#define CONTEXT_TYPE_VERTEX_PROPERTY "vertex_property"
namespace bl = boost::leaf;
namespace arrow {
class Array;
}
namespace gs {
class IFragmentWrapper;
/**
* @brief VertexPropertyContext can hold any number of columns. The context is
* designed for labeled fragment - ArrowFragment. Compared with
* LabeledVertexDataContext, the data type and column count can be determined at
* runtime.
*
* @tparam FRAG_T The fragment class (non-labeled fragment only)
*/
template <typename FRAG_T>
class VertexPropertyContext : public grape::ContextBase {
public:
using fragment_t = FRAG_T;
using vertex_t = typename fragment_t::vertex_t;
using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE;
using oid_t = typename fragment_t::oid_t;
explicit VertexPropertyContext(const fragment_t& fragment)
: fragment_(fragment) {}
const fragment_t& fragment() { return fragment_; }
int64_t add_column(const std::string& name, ContextDataType type) {
if (properties_map_.find(name) != properties_map_.end()) {
return -1;
}
auto column =
CreateColumn<fragment_t>(name, fragment_.InnerVertices(), type);
properties_map_.emplace(name, column);
auto ret = static_cast<int64_t>(vertex_properties_.size());
vertex_properties_.emplace_back(column);
return ret;
}
std::shared_ptr<IColumn> get_column(int64_t index) {
if (static_cast<size_t>(index) >= vertex_properties_.size()) {
return nullptr;
}
return vertex_properties_[index];
}
std::shared_ptr<IColumn> get_column(const std::string& name) {
auto iter = properties_map_.find(name);
if (iter == properties_map_.end()) {
return nullptr;
}
return iter->second;
}
template <typename DATA_T>
std::shared_ptr<Column<fragment_t, DATA_T>> get_typed_column(int64_t index) {
if (static_cast<size_t>(index) >= vertex_properties_.size()) {
return nullptr;
}
auto ret = vertex_properties_[index];
if (ret->type() != ContextTypeToEnum<DATA_T>::value) {
return nullptr;
}
return std::dynamic_pointer_cast<Column<fragment_t, DATA_T>>(ret);
}
template <typename DATA_T>
std::shared_ptr<Column<fragment_t, DATA_T>> get_typed_column(
const std::string& name) {
auto iter = properties_map_.find(name);
if (iter == properties_map_.end()) {
return nullptr;
}
auto ret = iter->second;
if (ret->type() != ContextTypeToEnum<DATA_T>::value) {
return nullptr;
}
return std::dynamic_pointer_cast<Column<fragment_t, DATA_T>>(ret);
}
std::vector<std::shared_ptr<IColumn>>& vertex_properties() {
return vertex_properties_;
}
const std::map<std::string, std::shared_ptr<IColumn>>& properties_map() {
return properties_map_;
}
private:
const fragment_t& fragment_;
std::vector<std::shared_ptr<IColumn>> vertex_properties_;
std::map<std::string, std::shared_ptr<IColumn>> properties_map_;
};
/**
* @brief VertexPropertyContextWrapper is the wrapper class of
* VertexPropertyContext for serializing the data.
*
* @tparam FRAG_T The fragment class (non-labeled fragment only)
*/
template <typename FRAG_T>
class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper {
using fragment_t = FRAG_T;
using vdata_t = typename fragment_t::vdata_t;
using oid_t = typename fragment_t::oid_t;
using context_t = VertexPropertyContext<fragment_t>;
public:
VertexPropertyContextWrapper(const std::string& id,
std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> context)
: IVertexPropertyContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(context)) {}
std::string context_type() override { return CONTEXT_TYPE_VERTEX_PROPERTY; }
std::string schema() override {
std::ostringstream os;
auto property_map = ctx_->properties_map();
for (auto& pair : property_map) {
os << pair.first + ",";
}
return os.str();
}
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, const Selector& selector,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
size_t old_size;
int64_t total_num;
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
auto local_num = static_cast<int64_t>(vertices.size());
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(1); // shape size
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<vdata_t>::value);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexData(vertices, *arc);
break;
}
case SelectorType::kResult: {
auto prop_name = selector.property_name();
auto& properties_map = ctx_->properties_map();
if (properties_map.find(prop_name) == properties_map.end()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Property " + prop_name + " not found in context.");
}
auto column = properties_map.at(prop_name);
if (comm_spec.fid() == 0) {
*arc << ContextDataTypeToInt(column->type());
*arc << total_num;
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(
serialize_context_property<FRAG_T>(*arc, vertices, column));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
return arc;
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
auto local_num = static_cast<int64_t>(vertices.size());
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
int64_t total_num;
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(selectors.size());
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
if (comm_spec.fid() == 0) {
*arc << col_name;
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<vdata_t>::value);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexData(vertices, *arc);
break;
}
case SelectorType::kResult: {
auto prop_name = selector.property_name();
auto& properties_map = ctx_->properties_map();
if (properties_map.find(prop_name) == properties_map.end()) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"Property " + prop_name + " can not found in context.");
}
auto column = properties_map.at(prop_name);
if (comm_spec.fid() == 0) {
*arc << ContextDataTypeToInt(column->type());
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(
serialize_context_property<FRAG_T>(*arc, vertices, column));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
}
return arc;
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const Selector& selector,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
size_t local_num = vertices.size(), total_num;
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
vineyard::ObjectID tensor_chunk_id;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexIdToVYTensor(client, vertices));
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexDataToVYTensor(client, vertices));
break;
}
case SelectorType::kResult: {
auto prop_name = selector.property_name();
auto& properties_map = ctx_->properties_map();
if (properties_map.find(prop_name) == properties_map.end()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Property " + prop_name + " can not found in context.");
}
auto column = properties_map.at(prop_name);
BOOST_LEAF_ASSIGN(tensor_chunk_id,
column_to_vy_tensor<FRAG_T>(client, column, vertices));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
MPIGlobalTensorBuilder builder(client, comm_spec);
builder.set_shape({static_cast<int64_t>(total_num)});
builder.set_partition_shape({static_cast<int64_t>(frag.fnum())});
builder.AddChunk(tensor_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
size_t local_num = vertices.size(), total_num;
std::vector<int64_t> shape{static_cast<int64_t>(local_num)};
vineyard::DataFrameBuilder df_builder(client);
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
df_builder.set_partition_index(frag.fid(), 0);
df_builder.set_row_batch_index(frag.fid());
for (auto& e : selectors) {
auto& col_name = e.first;
auto& selector = e.second;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(tensor_builder,
trans_utils.template VertexIdToVYTensorBuilder<oid_t>(
client, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_AUTO(tensor_builder, trans_utils.VertexDataToVYTensorBuilder(
client, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kResult: {
auto prop_name = selector.property_name();
auto& properties_map = ctx_->properties_map();
if (properties_map.find(prop_name) == properties_map.end()) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"Property " + prop_name + " can not found in context.");
}
auto column = properties_map.at(prop_name);
BOOST_LEAF_AUTO(tensor_builder, column_to_vy_tensor_builder<FRAG_T>(
client, column, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
}
auto df = df_builder.Seal(client);
VY_OK_OR_RAISE(df->Persist(client));
auto df_chunk_id = df->id();
MPIGlobalDataFrameBuilder builder(client, comm_spec);
builder.set_partition_shape(frag.fnum(), selectors.size());
builder.AddChunk(df_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>> ret;
auto& frag = ctx_->fragment();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
for (auto& pair : selectors) {
auto col_name = pair.first;
auto selector = pair.second;
std::shared_ptr<arrow::Array> arr;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(arr, trans_utils.VertexIdToArrowArray());
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_ASSIGN(arr, trans_utils.VertexDataToArrowArray());
break;
}
case SelectorType::kResult: {
auto prop_name = selector.property_name();
auto properties_map = ctx_->properties_map();
if (properties_map.find(prop_name) == properties_map.end()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Column: " + prop_name + " not found in context.");
}
arr = properties_map.at(prop_name)->ToArrowArray();
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
ret.emplace_back(col_name, arr);
}
return ret;
}
private:
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
} // namespace gs
#endif // ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_PROPERTY_CONTEXT_H_