analytical_engine/core/context/vertex_data_context.h (839 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_DATA_CONTEXT_H_
#define ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_DATA_CONTEXT_H_
#include <mpi.h>
#include <cstddef>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#ifdef NETWORKX
#include "core/object/dynamic.h"
#endif
#include "grape/app/context_base.h"
#include "grape/app/vertex_data_context.h"
#include "grape/serialization/in_archive.h"
#include "grape/utils/vertex_array.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/basic/ds/arrow_utils.h"
#include "vineyard/basic/ds/dataframe.h"
#include "vineyard/client/client.h"
#include "vineyard/client/ds/i_object.h"
#include "vineyard/common/util/uuid.h"
#include "core/config.h"
#include "core/context/context_protocols.h"
#include "core/context/i_context.h"
#include "core/context/selector.h"
#include "core/context/tensor_dataframe_builder.h"
#include "core/error.h"
#include "core/server/rpc_utils.h"
#include "core/utils/mpi_utils.h"
#include "core/utils/transform_utils.h"
#include "proto/types.pb.h"
#define CONTEXT_TYPE_VERTEX_DATA "vertex_data"
#define CONTEXT_TYPE_LABELED_VERTEX_DATA "labeled_vertex_data"
#define CONTEXT_TTPE_DYNAMIC_VERTEX_DATA "dynamic_vertex_data"
namespace bl = boost::leaf;
namespace arrow {
class Array;
}
#ifdef NETWORKX
namespace grape {
template <typename FRAG_T>
class VertexDataContext<FRAG_T, gs::dynamic::Value> : public ContextBase {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using vertex_array_t =
typename fragment_t::template vertex_array_t<gs::dynamic::Value>;
public:
using data_t = gs::dynamic::Value;
explicit VertexDataContext(const fragment_t& fragment,
bool including_outer = false)
: fragment_(fragment) {
if (including_outer) {
data_.Init(fragment.Vertices());
} else {
data_.Init(fragment.InnerVertices());
}
}
const fragment_t& fragment() { return fragment_; }
inline virtual vertex_array_t& data() { return data_; }
virtual const gs::dynamic::Value& GetVertexResult(const vertex_t& v) {
return data_[v];
}
private:
const fragment_t& fragment_;
vertex_array_t data_;
};
} // namespace grape
#endif // NETWORKX
namespace gs {
class IFragmentWrapper;
template <typename FRAG_T, typename DATA_T>
typename std::enable_if<!is_dynamic<DATA_T>::value,
bl::result<std::shared_ptr<arrow::Array>>>::type
context_data_to_arrow_array(
const typename FRAG_T::vertex_range_t& vertices,
const typename FRAG_T::template vertex_array_t<DATA_T>& data) {
typename vineyard::ConvertToArrowType<DATA_T>::BuilderType builder;
std::shared_ptr<typename vineyard::ConvertToArrowType<DATA_T>::ArrayType> arr;
for (auto v : vertices) {
ARROW_OK_OR_RAISE(builder.Append(data[v]));
}
CHECK_ARROW_ERROR(builder.Finish(&arr));
return std::dynamic_pointer_cast<arrow::Array>(arr);
}
template <typename FRAG_T, typename DATA_T>
typename std::enable_if<is_dynamic<DATA_T>::value,
bl::result<std::shared_ptr<arrow::Array>>>::type
context_data_to_arrow_array(
const typename FRAG_T::vertex_range_t& vertices,
const typename FRAG_T::template vertex_array_t<DATA_T>& data) {
RETURN_GS_ERROR(vineyard::ErrorCode::kUnsupportedOperationError,
"Can not transform dynamic type");
}
template <typename FRAG_T, typename COMPUTE_CONTEXT_T>
class PregelContext;
/**
* @brief VertexDataContext for labeled fragment
*
* @tparam FRAG_T The fragment class (Labeled fragment only)
* @tparam DATA_T The Data type hold by context
* @tparam Enable
*/
template <typename FRAG_T, typename DATA_T>
class LabeledVertexDataContext : public grape::ContextBase {
using fragment_t = FRAG_T;
using vertex_t = typename fragment_t::vertex_t;
using vid_t = typename fragment_t::vid_t;
using oid_t = typename fragment_t::oid_t;
using label_id_t = typename fragment_t::label_id_t;
using vertex_array_t =
grape::VertexArray<typename fragment_t::vertices_t, DATA_T>;
public:
using data_t = DATA_T;
static_assert(std::is_pod<data_t>::value ||
std::is_same<data_t, std::string>::value,
"Unsupported data type");
explicit LabeledVertexDataContext(const fragment_t& fragment,
bool including_outer = false)
: fragment_(fragment) {
auto v_label_num = fragment_.vertex_label_num();
data_.resize(v_label_num);
for (label_id_t i = 0; i < v_label_num; ++i) {
if (including_outer) {
data_[i].Init(fragment.Vertices(i));
} else {
data_[i].Init(fragment.InnerVertices(i));
}
}
}
const fragment_t& fragment() { return fragment_; }
const data_t& GetValue(vertex_t v) const {
label_id_t i = fragment_.vertex_label(v);
int64_t offset = fragment_.vertex_offset(v);
return data_[i][vertex_t{offset}];
}
std::vector<vertex_array_t>& data() { return data_; }
private:
const fragment_t& fragment_;
std::vector<vertex_array_t> data_;
};
/**
* @brief This is the wrapper class for VertexDataContext. A series of methods
* are provided to transform the data hold by the context.
*
* @tparam FRAG_T The fragment class (Non-labeled fragment only)
* @tparam DATA_T The Data type hold by context
*/
template <typename FRAG_T, typename DATA_T>
class VertexDataContextWrapper : public IVertexDataContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using context_t = grape::VertexDataContext<fragment_t, DATA_T>;
using vdata_t = typename fragment_t::vdata_t;
using data_t = DATA_T;
public:
explicit VertexDataContextWrapper(
const std::string& id, std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> ctx)
: IVertexDataContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(ctx)) {}
std::string context_type() override { return CONTEXT_TYPE_VERTEX_DATA; }
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, const Selector& selector,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
auto& data = ctx_->data();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
int64_t local_num = static_cast<int64_t>(vertices.size()), total_num;
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(1); // shape size
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
// N.B. This method must be invoked on every worker!
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexLabelId: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<int>::value);
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexLabelId(vertices, *arc));
break;
}
case SelectorType::kVertexData: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<vdata_t>::value);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexData(vertices, *arc);
break;
}
case SelectorType::kResult: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
*arc << total_num;
}
old_size = arc->GetSize();
for (auto v : vertices) {
*arc << data[v];
}
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
return std::move(arc);
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
auto& data = ctx_->data();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
auto local_num = static_cast<int64_t>(vertices.size());
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
int64_t total_num;
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(selectors.size());
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (auto& pair : selectors) {
auto col_name = pair.first;
auto selector = pair.second;
if (comm_spec.fid() == 0) {
*arc << col_name;
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(type_id, trans_utils.GetOidTypeId());
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(type_id);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexLabelId: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<int>::value);
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexLabelId(vertices, *arc));
break;
}
case SelectorType::kVertexData: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<vdata_t>::value);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexData(vertices, *arc);
break;
}
case SelectorType::kResult: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
}
old_size = arc->GetSize();
for (auto v : vertices) {
*arc << data[v];
}
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
}
return std::move(arc);
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const Selector& selector,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
auto& data = ctx_->data();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
size_t local_num = vertices.size(), total_num;
std::vector<int64_t> shape{static_cast<int64_t>(local_num)};
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
vineyard::ObjectID tensor_chunk_id;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexIdToVYTensor(client, vertices));
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexDataToVYTensor(client, vertices));
break;
}
case SelectorType::kResult: {
auto f = [&data, &vertices](size_t i) {
auto v = vertices[i];
return data[v];
};
BOOST_LEAF_ASSIGN(
tensor_chunk_id,
build_vy_tensor(client, vertices.size(), f, comm_spec.fid()));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
MPIGlobalTensorBuilder builder(client, comm_spec);
builder.set_shape({static_cast<int64_t>(total_num)});
builder.set_partition_shape({static_cast<int64_t>(frag.fnum())});
builder.AddChunk(tensor_chunk_id);
auto value = builder.Seal(client);
return value->id();
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
auto& data = ctx_->data();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(range);
size_t local_num = vertices.size(), total_num;
std::vector<int64_t> shape{static_cast<int64_t>(local_num)};
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
vineyard::DataFrameBuilder df_builder(client);
df_builder.set_partition_index(frag.fid(), 0);
df_builder.set_row_batch_index(frag.fid());
for (auto& e : selectors) {
auto& col_name = e.first;
auto& selector = e.second;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(tensor_builder,
trans_utils.template VertexIdToVYTensorBuilder<oid_t>(
client, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_AUTO(tensor_builder, trans_utils.VertexDataToVYTensorBuilder(
client, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kResult: {
auto f = [&data, &vertices](size_t i) { return data[vertices[i]]; };
BOOST_LEAF_AUTO(tensor_builder,
build_vy_tensor_builder(client, vertices.size(), f,
comm_spec.fid()));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
}
auto df = df_builder.Seal(client);
VY_OK_OR_RAISE(df->Persist(client));
auto df_chunk_id = df->id();
MPIGlobalDataFrameBuilder builder(client, comm_spec);
builder.set_partition_shape(frag.fnum(), selectors.size());
builder.AddChunk(df_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
auto& frag = ctx_->fragment();
auto& data = ctx_->data();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
arrow_arrays;
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
std::shared_ptr<arrow::Array> arr;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(arr, trans_utils.VertexIdToArrowArray());
break;
}
case SelectorType::kVertexData: {
BOOST_LEAF_ASSIGN(arr, trans_utils.VertexDataToArrowArray());
break;
}
case SelectorType::kResult: {
auto tmp = context_data_to_arrow_array<fragment_t, data_t>(
frag.InnerVertices(), data);
BOOST_LEAF_ASSIGN(arr, tmp);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
arrow_arrays.emplace_back(col_name, arr);
}
return arrow_arrays;
}
private:
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
#ifdef NETWORKX
/**
* @brief This is dynamic::Value specialization of VertexDataContext.
*
* @tparam FRAG_T The fragment class (Non-labeled fragment only)
*/
template <typename FRAG_T>
class VertexDataContextWrapper<FRAG_T, dynamic::Value>
: public IVertexDataContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using context_t = grape::VertexDataContext<fragment_t, dynamic::Value>;
using vdata_t = typename fragment_t::vdata_t;
using data_t = dynamic::Value;
public:
explicit VertexDataContextWrapper(
const std::string& id, std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> ctx)
: IVertexDataContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(ctx)) {}
std::string context_type() override {
return CONTEXT_TTPE_DYNAMIC_VERTEX_DATA;
}
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::string> GetContextData(const rpc::GSParams& params) override {
BOOST_LEAF_AUTO(node_in_json, params.Get<std::string>(rpc::NODE));
oid_t oid;
dynamic::Parse(node_in_json, oid);
auto& frag = ctx_->fragment();
if (frag.HasNode(oid)) {
vertex_t v;
frag.GetVertex(oid, v);
return dynamic::Stringify(ctx_->GetVertexResult(v));
}
return std::string("");
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, const Selector& selector,
const std::pair<std::string, std::string>& range) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"DynamicVertexDataContext not support the operation.");
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"DynamicVertexDataContext not support the operation.");
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const Selector& selector,
const std::pair<std::string, std::string>& range) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"DynamicVertexDataContext not support the operation.");
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const std::vector<std::pair<std::string, Selector>>& selectors,
const std::pair<std::string, std::string>& range) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"DynamicVertexDataContext not support the operation.");
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"DynamicVertexDataContext not support the operation.");
}
private:
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
#endif // NETWORKX
/**
* @brief This is the wrapper class for LabeledVertexDataContext. A series of
* methods are provided to transform the data hold by the context.
*
* @tparam FRAG_T The fragment class (Labeled fragment only)
* @tparam DATA_T The Data type hold by context
*/
template <typename FRAG_T, typename DATA_T>
class LabeledVertexDataContextWrapper
: public ILabeledVertexDataContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using label_id_t = typename fragment_t::label_id_t;
using prop_id_t = typename fragment_t::prop_id_t;
using context_t = LabeledVertexDataContext<FRAG_T, DATA_T>;
using data_t = DATA_T;
public:
explicit LabeledVertexDataContextWrapper(
const std::string& id, std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> context)
: ILabeledVertexDataContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(context)) {}
std::string context_type() override {
return CONTEXT_TYPE_LABELED_VERTEX_DATA;
}
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, const LabeledSelector& selector,
const std::pair<std::string, std::string>& range) override {
size_t old_size;
int64_t total_num;
auto& frag = ctx_->fragment();
auto label_id = selector.label_id();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(label_id, range);
auto local_num = static_cast<int64_t>(vertices.size());
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(1); // # of dims
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
switch (selector.type()) {
case SelectorType::kVertexId: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<oid_t>::value);
*arc << total_num;
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
if (comm_spec.fid() == 0) {
*arc << vineyard::ArrowDataTypeToInt(
frag.vertex_property_type(label_id, prop_id));
*arc << total_num;
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexProperty(vertices, label_id,
prop_id, *arc));
break;
}
case SelectorType::kResult: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
*arc << total_num;
}
old_size = arc->GetSize();
serialize_context_data(*arc, label_id, vertices);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
return std::move(arc);
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, LabeledSelector>>& selectors,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
BOOST_LEAF_AUTO(label_id, LabeledSelector::GetVertexLabelId(selectors));
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(label_id, range);
auto local_num = static_cast<int64_t>(vertices.size());
auto arc = std::make_unique<grape::InArchive>();
if (comm_spec.fid() == 0) {
int64_t total_num;
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(selectors.size());
*arc << total_num;
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
if (comm_spec.fid() == 0) {
*arc << col_name;
}
size_t old_size;
switch (selector.type()) {
case SelectorType::kVertexId: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(
vineyard::TypeToInt<typename fragment_t::oid_t>::value);
}
old_size = arc->GetSize();
trans_utils.SerializeVertexId(vertices, *arc);
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
if (comm_spec.fid() == 0) {
*arc << vineyard::ArrowDataTypeToInt(
frag.vertex_property_type(label_id, prop_id));
}
old_size = arc->GetSize();
BOOST_LEAF_CHECK(trans_utils.SerializeVertexProperty(vertices, label_id,
prop_id, *arc));
break;
}
case SelectorType::kResult: {
if (comm_spec.fid() == 0) {
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
}
old_size = arc->GetSize();
serialize_context_data(*arc, label_id, vertices);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
gather_archives(*arc, comm_spec, old_size);
}
return std::move(arc);
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const LabeledSelector& selector,
const std::pair<std::string, std::string>& range) override {
auto& frag = ctx_->fragment();
auto label_id = selector.label_id();
auto& data = ctx_->data()[label_id];
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(label_id, range);
size_t local_num = vertices.size(), total_num;
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
vineyard::ObjectID tensor_chunk_id;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexIdToVYTensor(client, vertices));
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
BOOST_LEAF_ASSIGN(tensor_chunk_id,
trans_utils.VertexPropertyToVYTensor(
client, label_id, prop_id, vertices));
break;
}
case SelectorType::kResult: {
auto f = [&data, &vertices](size_t i) { return data[vertices[i]]; };
BOOST_LEAF_ASSIGN(
tensor_chunk_id,
build_vy_tensor(client, vertices.size(), f, comm_spec.fid()));
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
MPIGlobalTensorBuilder builder(client, comm_spec);
builder.set_shape({static_cast<int64_t>(total_num)});
builder.set_partition_shape({static_cast<int64_t>(frag.fnum())});
builder.AddChunk(tensor_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client,
const std::vector<std::pair<std::string, LabeledSelector>>& selectors,
const std::pair<std::string, std::string>& range) override {
BOOST_LEAF_AUTO(label_id, LabeledSelector::GetVertexLabelId(selectors));
auto& frag = ctx_->fragment();
auto& data = ctx_->data()[label_id];
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
auto vertices = trans_utils.SelectVertices(label_id, range);
size_t local_num = vertices.size(), total_num;
std::vector<int64_t> shape{static_cast<int64_t>(local_num)};
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
vineyard::DataFrameBuilder df_builder(client);
df_builder.set_partition_index(frag.fid(), 0);
df_builder.set_row_batch_index(frag.fid());
for (auto& e : selectors) {
auto& col_name = e.first;
auto& selector = e.second;
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_AUTO(tensor_builder,
trans_utils.template VertexIdToVYTensorBuilder<oid_t>(
client, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
BOOST_LEAF_AUTO(tensor_builder,
trans_utils.VertexPropertyToVYTensorBuilder(
client, label_id, prop_id, vertices));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
case SelectorType::kResult: {
auto f = [&data, &vertices](size_t i) { return data[vertices[i]]; };
BOOST_LEAF_AUTO(tensor_builder,
build_vy_tensor_builder(client, vertices.size(), f,
comm_spec.fid()));
df_builder.AddColumn(col_name, tensor_builder);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
}
auto df = df_builder.Seal(client);
VY_OK_OR_RAISE(df->Persist(client));
auto df_chunk_id = df->id();
MPIGlobalDataFrameBuilder builder(client, comm_spec);
builder.set_partition_shape(frag.fnum(), selectors.size());
builder.AddChunk(df_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<std::map<
label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>>
ToArrowArrays(const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, LabeledSelector>>&
selectors) override {
auto& frag = ctx_->fragment();
TransformUtils<FRAG_T> trans_utils(comm_spec, frag);
std::map<label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ret; // <label id, <column name, selector>>
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
std::shared_ptr<arrow::Array> arr;
auto label_id = selector.label_id();
switch (selector.type()) {
case SelectorType::kVertexId: {
BOOST_LEAF_ASSIGN(arr, trans_utils.VertexIdToArrowArray(label_id));
break;
}
case SelectorType::kVertexData: {
auto prop_id = selector.property_id();
arr = trans_utils.VertexPropertyToArrowArray(label_id, prop_id);
break;
}
case SelectorType::kResult: {
auto& data = ctx_->data()[label_id];
if (!selector.property_name().empty()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Should not specify property name.");
}
auto tmp = context_data_to_arrow_array<fragment_t, data_t>(
frag.InnerVertices(label_id), data);
BOOST_LEAF_ASSIGN(arr, tmp);
break;
}
default:
RETURN_GS_ERROR(
vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: vid,vdata "
"and result. selector: " +
selector.str());
}
ret[label_id].emplace_back(col_name, arr);
}
return ret;
}
private:
void serialize_context_data(grape::InArchive& arc, label_id_t label_id,
const std::vector<vertex_t>& vertices) {
auto& ctx_data = ctx_->data();
auto& labeled_data = ctx_data[label_id];
for (auto v : vertices) {
arc << labeled_data[v];
}
}
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
} // namespace gs
#endif // ANALYTICAL_ENGINE_CORE_CONTEXT_VERTEX_DATA_CONTEXT_H_