analytical_engine/core/context/tensor_context.h (854 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ANALYTICAL_ENGINE_CORE_CONTEXT_TENSOR_CONTEXT_H_
#define ANALYTICAL_ENGINE_CORE_CONTEXT_TENSOR_CONTEXT_H_
#include <glog/logging.h>
#include <mpi.h>
#include <memory>
#include <ostream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "arrow/array/builder_binary.h"
#include "boost/leaf/error.hpp"
#include "boost/leaf/result.hpp"
#include "grape/app/context_base.h"
#include "grape/serialization/in_archive.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/basic/ds/dataframe.h"
#include "vineyard/basic/ds/tensor.h"
#include "vineyard/client/client.h"
#include "vineyard/common/util/uuid.h"
#ifdef NETWORKX
#include "core/object/dynamic.h"
#endif
#include "core/config.h"
#include "core/context/context_protocols.h"
#include "core/context/i_context.h"
#include "core/context/selector.h"
#include "core/context/tensor_dataframe_builder.h"
#include "core/error.h"
#include "core/utils/mpi_utils.h"
#include "core/utils/transform_utils.h"
#include "core/utils/trivial_tensor.h"
#define CONTEXT_TYPE_TENSOR "tensor"
namespace bl = boost::leaf;
namespace grape {
template <typename T>
inline InArchive& operator<<(InArchive& in_archive,
const gs::trivial_tensor_t<T>& tensor) {
size_t size = tensor.size();
if (size > 0) {
in_archive.AddBytes(tensor.data(), size * sizeof(T));
}
return in_archive;
}
inline InArchive& operator<<(InArchive& in_archive,
const gs::trivial_tensor_t<std::string>& tensor) {
size_t size = tensor.size();
if (size > 0) {
for (size_t i = 0; i < tensor.size(); ++i) {
in_archive << tensor.data()->GetView(i);
}
}
return in_archive;
}
#ifdef NETWORKX
inline InArchive& operator<<(
InArchive& in_archive,
const gs::trivial_tensor_t<gs::dynamic::Value>& tensor) {
size_t size = tensor.size();
if (size > 0) {
auto type = gs::dynamic::GetType(tensor.data()[0]);
CHECK(type == gs::dynamic::Type::kInt32Type ||
type == gs::dynamic::Type::kInt64Type ||
type == gs::dynamic::Type::kDoubleType ||
type == gs::dynamic::Type::kStringType);
for (size_t i = 0; i < tensor.size(); i++) {
in_archive << tensor.data()[i];
}
}
return in_archive;
}
#endif
} // namespace grape
namespace gs {
class IFragmentWrapper;
template <typename T>
static bl::result<size_t> get_n_dim(const grape::CommSpec& comm_spec,
const trivial_tensor_t<T>& tensor) {
auto shape = tensor.shape();
auto n_dim = shape.size();
std::vector<size_t> n_dims;
vineyard::GlobalAllGatherv<size_t>(n_dim, n_dims, comm_spec);
// find out first n-dim of non empty shape
n_dim = 0;
for (auto e : n_dims) {
if (e != 0) {
n_dim = e;
break;
}
}
if (n_dim == 0) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Every tensor is 0-dim.");
}
for (auto e : n_dims) {
if (e != 0 && e != n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Dim count is not consistent.");
}
}
return n_dim;
}
template <typename T>
static bl::result<std::vector<size_t>> get_non_empty_shape(
const grape::CommSpec& comm_spec, const trivial_tensor_t<T>& tensor,
uint32_t axis) {
BOOST_LEAF_AUTO(n_dim, get_n_dim<T>(comm_spec, tensor));
auto shape = tensor.shape();
std::vector<std::vector<std::size_t>> shapes;
vineyard::GlobalAllGatherv<std::vector<std::size_t>>(shape, shapes,
comm_spec);
std::vector<size_t> first_shape;
// find out first non-empty shape
for (auto& sp : shapes) {
if (!sp.empty()) {
first_shape = sp;
break;
}
}
if (first_shape.empty()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Every tensor is 0-dim.");
}
// for every dim except the dim to concat
for (uint32_t i = 0; i < n_dim; i++) {
if (i != axis) {
for (auto& sp : shapes) {
if (!sp.empty() && sp[i] != first_shape[i]) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Incompatible dimension of tensors");
}
}
}
}
return first_shape;
}
template <typename T>
static bl::result<size_t> get_n_column(const grape::CommSpec& comm_spec,
const trivial_tensor_t<T>& tensor) {
auto shape = tensor.shape();
if (!shape.empty() && shape.size() != 2) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"This is not a 2-dim tensor.");
}
auto n_col = shape.empty() ? 0 : shape[1];
std::vector<size_t> n_cols;
vineyard::GlobalAllGatherv<size_t>(n_col, n_cols, comm_spec);
for (auto e : n_cols) {
if (e != 0) {
n_col = e;
break;
}
}
if (n_col == 0) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Every tensor is empty.");
}
for (auto e : n_cols) {
if (e != 0 && e != n_col) {
std::stringstream ss;
ss << "Number of column is not same. ";
ss << "The column number of first non-empty is " << n_col;
ss << ". But this one is " << e;
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, ss.str());
}
}
return n_col;
}
/**
* @brief TensorContext is designed for holding a bunch of computation results.
* The TensorContext should be used if the number of elements are
* not related to the number of the vertex.
*
* @tparam FRAG_T
* @tparam DATA_T
*/
template <typename FRAG_T, typename DATA_T>
class TensorContext : public grape::ContextBase {
using fragment_t = FRAG_T;
using vertex_t = typename fragment_t::vertex_t;
using tensor_t = trivial_tensor_t<DATA_T>;
public:
using data_t = DATA_T;
explicit TensorContext(const fragment_t& fragment) : fragment_(fragment) {}
const fragment_t& fragment() { return fragment_; }
void assign(const std::vector<data_t>& data,
const std::vector<size_t>& shape) {
size_t size = 1;
for (size_t dim_size : shape) {
size *= dim_size;
}
CHECK_EQ(data.size(), size);
set_shape(shape);
memcpy(tensor_.data(), data.data(), sizeof(data_t) * data.size());
}
void assign(const data_t& data) { tensor_.fill(data); }
void set_shape(std::vector<std::size_t> shape) {
CHECK(!shape.empty());
tensor_.resize(shape);
}
std::vector<size_t> shape() const { return tensor_.shape(); }
inline tensor_t& tensor() { return tensor_; }
private:
const fragment_t& fragment_;
tensor_t tensor_;
};
template <typename FRAG_T>
class TensorContext<FRAG_T, std::string> : public grape::ContextBase {
using fragment_t = FRAG_T;
using vertex_t = typename fragment_t::vertex_t;
using tensor_t = trivial_tensor_t<std::string>;
public:
using data_t = std::string;
explicit TensorContext(const fragment_t& fragment) : fragment_(fragment) {}
const fragment_t& fragment() { return fragment_; }
void assign(const std::vector<data_t>& data,
const std::vector<size_t>& shape) {
size_t size = 1;
for (size_t dim_size : shape) {
size *= dim_size;
}
CHECK_EQ(data.size(), size);
set_shape(shape);
arrow::LargeStringBuilder builder;
CHECK_ARROW_ERROR(builder.AppendValues(data));
CHECK_ARROW_ERROR(builder.Finish(&(tensor_.data())));
}
void assign(const data_t& data) { tensor_.fill(data); }
void set_shape(std::vector<std::size_t> shape) {
CHECK(!shape.empty());
tensor_.resize(shape);
}
std::vector<size_t> shape() const { return tensor_.shape(); }
inline tensor_t& tensor() { return tensor_; }
private:
const fragment_t& fragment_;
tensor_t tensor_;
};
/**
* @brief TensorContextWrapper is the wrapper class for TensorContext.
*
* @tparam FRAG_T
* @tparam DATA_T
*/
template <typename FRAG_T, typename DATA_T, typename = void>
class TensorContextWrapper : public ITensorContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using context_t = TensorContext<FRAG_T, DATA_T>;
using data_t = DATA_T;
public:
explicit TensorContextWrapper(const std::string& id,
std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> ctx)
: ITensorContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(ctx)) {}
std::string context_type() override { return CONTEXT_TYPE_TENSOR; }
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, uint32_t axis) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (axis >= n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid axis " + std::to_string(axis) +
", n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(first_shape, get_non_empty_shape(comm_spec, tensor, axis));
int64_t local_num = shape.empty() ? 0 : shape[axis], total_num;
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_dim); // shape size
first_shape[axis] = total_num; // the shape after combined
for (auto dim_size : first_shape) {
*arc << static_cast<int64_t>(dim_size);
}
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
size_t total_size = first_shape.empty() ? 0 : 1;
for (auto e : first_shape) {
total_size *= e;
}
*arc << static_cast<int64_t>(total_size);
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
auto old_size = arc->GetSize();
*arc << tensor;
gather_archives(*arc, comm_spec, old_size);
return arc;
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (n_dim != 2) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"This is not a 2-dims tensor, n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(n_col, get_n_column<data_t>(comm_spec, tensor));
int64_t n_row = shape.empty() ? 0 : shape[0];
int64_t total_n_row;
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
MPI_Reduce(&n_row, &total_n_row, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_col);
*arc << static_cast<int64_t>(total_n_row);
} else {
MPI_Reduce(&n_row, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
*arc << "Col " + std::to_string(col_idx); // Column name
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
}
// Python side requires columnar data structure
auto old_size = arc->GetSize();
for (auto row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
*arc << tensor.data()[idx];
}
gather_archives(*arc, comm_spec, old_size);
}
return arc;
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
uint32_t axis) override {
auto& frag = ctx_->fragment();
auto& tensor = ctx_->tensor();
auto local_shape = ctx_->shape();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (axis >= n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid axis " + std::to_string(axis) +
", n-dim: " + std::to_string(n_dim));
}
size_t local_num = local_shape.empty() ? 0 : local_shape[axis], total_num;
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
BOOST_LEAF_AUTO(first_shape, get_non_empty_shape(comm_spec, tensor, axis));
first_shape[axis] = total_num; // the shape after combined
if (local_shape.empty()) {
local_shape.resize(n_dim, 0);
}
std::vector<int64_t> partition_index;
for (size_t i = 0; i < n_dim; i++) {
partition_index.push_back(frag.fid());
}
std::vector<int64_t> vy_tensor_shape;
for (auto e : local_shape) {
vy_tensor_shape.push_back(static_cast<int64_t>(e));
}
vineyard::TensorBuilder<data_t> tensor_builder(client, vy_tensor_shape,
partition_index);
for (size_t offset = 0; offset < tensor.size(); offset++) {
tensor_builder.data()[offset] = tensor.data()[offset];
}
auto vy_tensor = std::dynamic_pointer_cast<vineyard::Tensor<data_t>>(
tensor_builder.Seal(client));
VY_OK_OR_RAISE(vy_tensor->Persist(client));
std::vector<int64_t> global_shape;
std::vector<int64_t> global_partition_shape;
for (auto e : first_shape) {
global_shape.push_back(static_cast<int64_t>(e));
global_partition_shape.push_back(frag.fnum());
}
MPIGlobalTensorBuilder builder(client, comm_spec);
builder.set_shape(global_shape);
builder.set_partition_shape(global_partition_shape);
builder.AddChunk(vy_tensor->id());
return builder.Seal(client)->id();
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto& frag = ctx_->fragment();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (n_dim != 2) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"This is not a 2-dims tensor, n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(n_col, get_n_column<data_t>(comm_spec, tensor));
size_t n_row = shape.empty() ? 0 : shape[0];
vineyard::DataFrameBuilder df_builder(client);
df_builder.set_partition_index(frag.fid(), 0);
df_builder.set_row_batch_index(frag.fid());
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
std::vector<int64_t> shape{static_cast<int64_t>(n_row)};
auto tensor_builder =
std::make_shared<vineyard::TensorBuilder<DATA_T>>(client, shape);
for (size_t row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
tensor_builder->data()[row_idx] = tensor.data()[idx];
}
df_builder.AddColumn("Col " + std::to_string(col_idx), tensor_builder);
}
auto df = df_builder.Seal(client);
VY_OK_OR_RAISE(df->Persist(client));
auto df_chunk_id = df->id();
MPIGlobalDataFrameBuilder builder(client, comm_spec);
builder.set_partition_shape(frag.fnum(), n_col);
builder.AddChunk(df_chunk_id);
auto vy_obj = builder.Seal(client);
return vy_obj->id();
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
auto& frag = ctx_->fragment();
auto& tensor = ctx_->tensor();
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
arrow_arrays;
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
std::shared_ptr<arrow::Array> arr;
switch (selector.type()) {
case SelectorType::kResult: {
typename vineyard::ConvertToArrowType<data_t>::BuilderType builder;
std::shared_ptr<
typename vineyard::ConvertToArrowType<data_t>::ArrayType>
arr_ptr;
for (size_t offset = 0; offset < tensor.size(); offset++) {
ARROW_OK_OR_RAISE(builder.Append(tensor.data()[offset]));
}
CHECK_ARROW_ERROR(builder.Finish(&arr_ptr));
arr = std::dynamic_pointer_cast<arrow::Array>(arr_ptr);
break;
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: "
"result. selector: " +
selector.str());
}
arrow_arrays.emplace_back(col_name, arr);
}
return arrow_arrays;
}
private:
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
template <typename FRAG_T>
class TensorContextWrapper<FRAG_T, std::string> : public ITensorContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using context_t = TensorContext<FRAG_T, std::string>;
using data_t = std::string;
public:
explicit TensorContextWrapper(const std::string& id,
std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> ctx)
: ITensorContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(ctx)) {}
std::string context_type() override { return CONTEXT_TYPE_TENSOR; }
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, uint32_t axis) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (axis >= n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid axis " + std::to_string(axis) +
", n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(first_shape, get_non_empty_shape(comm_spec, tensor, axis));
int64_t local_num = shape.empty() ? 0 : shape[axis], total_num;
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_dim); // shape size
first_shape[axis] = total_num; // the shape after combined
for (auto dim_size : first_shape) {
*arc << static_cast<int64_t>(dim_size);
}
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
size_t total_size = first_shape.empty() ? 0 : 1;
for (auto e : first_shape) {
total_size *= e;
}
*arc << static_cast<int64_t>(total_size);
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
auto old_size = arc->GetSize();
*arc << tensor;
gather_archives(*arc, comm_spec, old_size);
return arc;
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (n_dim != 2) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"This is not a 2-dims tensor, n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(n_col, get_n_column<data_t>(comm_spec, tensor));
int64_t n_row = shape.empty() ? 0 : shape[0];
int64_t total_n_row;
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
MPI_Reduce(&n_row, &total_n_row, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_col);
*arc << static_cast<int64_t>(total_n_row);
} else {
MPI_Reduce(&n_row, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
*arc << "Col " + std::to_string(col_idx); // Column name
*arc << static_cast<int>(vineyard::TypeToInt<data_t>::value);
}
// Python side requires columnar data structure
auto old_size = arc->GetSize();
for (auto row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
*arc << tensor.data()->GetView(idx);
}
gather_archives(*arc, comm_spec, old_size);
}
return arc;
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
uint32_t axis) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented ToVineyardTensor for string type");
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented ToVineyardDataframe for string type");
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
auto& frag = ctx_->fragment();
auto& tensor = ctx_->tensor();
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
arrow_arrays;
for (auto& pair : selectors) {
auto& col_name = pair.first;
auto& selector = pair.second;
std::shared_ptr<arrow::Array> arr;
switch (selector.type()) {
case SelectorType::kResult: {
arr = std::dynamic_pointer_cast<arrow::Array>(tensor.data());
break;
}
default:
RETURN_GS_ERROR(vineyard::ErrorCode::kUnsupportedOperationError,
"Unsupported operation, available selector type: "
"result. selector: " +
selector.str());
}
arrow_arrays.emplace_back(col_name, arr);
}
return arrow_arrays;
}
private:
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
#ifdef NETWORKX
/**
* @brief This is the specialized TensorContextWrapper for dynamic::Value type
* of oid
* @tparam FRAG_T
* @tparam DATA_T
*/
template <typename FRAG_T, typename DATA_T>
class TensorContextWrapper<
FRAG_T, DATA_T,
typename std::enable_if<std::is_same<DATA_T, dynamic::Value>::value>::type>
: public ITensorContextWrapper {
using fragment_t = FRAG_T;
using oid_t = typename fragment_t::oid_t;
using vertex_t = typename fragment_t::vertex_t;
using context_t = TensorContext<FRAG_T, DATA_T>;
using data_t = DATA_T;
public:
explicit TensorContextWrapper(const std::string& id,
std::shared_ptr<IFragmentWrapper> frag_wrapper,
std::shared_ptr<context_t> ctx)
: ITensorContextWrapper(id),
frag_wrapper_(std::move(frag_wrapper)),
ctx_(std::move(ctx)) {}
std::string context_type() override { return CONTEXT_TYPE_TENSOR; }
std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
bl::result<std::unique_ptr<grape::InArchive>> ToNdArray(
const grape::CommSpec& comm_spec, uint32_t axis) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (axis >= n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid axis " + std::to_string(axis) +
", n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(first_shape, get_non_empty_shape(comm_spec, tensor, axis));
BOOST_LEAF_AUTO(data_type, get_dynamic_type(comm_spec, tensor));
int64_t local_num = shape.empty() ? 0 : shape[axis], total_num;
if (comm_spec.fid() == 0) {
MPI_Reduce(&local_num, &total_num, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_dim); // shape size
first_shape[axis] = total_num; // shape after combined
for (auto dim_size : first_shape) {
*arc << static_cast<int64_t>(dim_size);
}
if (data_type == dynamic::Type::kInt32Type) {
*arc << static_cast<int>(vineyard::TypeToInt<int32_t>::value);
} else if (data_type == dynamic::Type::kInt64Type) {
*arc << static_cast<int>(vineyard::TypeToInt<int64_t>::value);
} else if (data_type == dynamic::Type::kDoubleType) {
*arc << static_cast<int>(vineyard::TypeToInt<double>::value);
} else if (data_type == dynamic::Type::kNullType) {
*arc << static_cast<int>(vineyard::TypeToInt<void>::value);
} else if (data_type == dynamic::Type::kStringType) {
*arc << static_cast<int>(vineyard::TypeToInt<std::string>::value);
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Only support int64, double");
}
size_t total_size = first_shape.empty() ? 0 : 1;
for (auto e : first_shape) {
total_size *= e;
}
*arc << static_cast<int64_t>(total_size);
} else {
MPI_Reduce(&local_num, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
auto old_size = arc->GetSize();
*arc << tensor;
gather_archives(*arc, comm_spec, old_size);
return arc;
}
bl::result<std::unique_ptr<grape::InArchive>> ToDataframe(
const grape::CommSpec& comm_spec) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto arc = std::make_unique<grape::InArchive>();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (n_dim != 2) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"This is not a 2-dims tensor, n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(n_col, get_n_column<data_t>(comm_spec, tensor));
int64_t n_row = shape.empty() ? 0 : shape[0];
int64_t total_n_row;
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
MPI_Reduce(&n_row, &total_n_row, 1, MPI_INT64_T, MPI_SUM,
comm_spec.worker_id(), comm_spec.comm());
*arc << static_cast<int64_t>(n_col);
*arc << static_cast<int64_t>(total_n_row);
} else {
MPI_Reduce(&n_row, NULL, 1, MPI_INT64_T, MPI_SUM,
comm_spec.FragToWorker(0), comm_spec.comm());
}
BOOST_LEAF_AUTO(data_type, get_dynamic_type(comm_spec, tensor));
if (data_type == dynamic::Type::kInt64Type) {
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
*arc << "Col " + std::to_string(col_idx); // Column name
*arc << static_cast<int>(vineyard::TypeToInt<int64_t>::value);
}
auto old_size = arc->GetSize();
for (size_t row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
*arc << tensor.data()[idx].GetInt64();
}
gather_archives(*arc, comm_spec, old_size);
}
} else if (data_type == dynamic::Type::kDoubleType) {
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
if (comm_spec.worker_id() == grape::kCoordinatorRank) {
*arc << "Col " + std::to_string(col_idx);
*arc << static_cast<int>(vineyard::TypeToInt<double>::value);
}
auto old_size = arc->GetSize();
for (size_t row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
*arc << tensor.data()[idx].GetDouble();
}
gather_archives(*arc, comm_spec, old_size);
}
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Only support int64 or double");
}
return arc;
}
bl::result<vineyard::ObjectID> ToVineyardTensor(
const grape::CommSpec& comm_spec, vineyard::Client& client,
uint32_t axis) override {
auto& frag = ctx_->fragment();
auto& tensor = ctx_->tensor();
auto local_shape = ctx_->shape();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (axis >= n_dim) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid axis " + std::to_string(axis) +
", n-dim: " + std::to_string(n_dim));
}
size_t local_num = local_shape.empty() ? 0 : local_shape[axis], total_num;
MPI_Allreduce(&local_num, &total_num, 1, MPI_SIZE_T, MPI_SUM,
comm_spec.comm());
BOOST_LEAF_AUTO(first_shape, get_non_empty_shape(comm_spec, tensor, axis));
BOOST_LEAF_AUTO(data_type, get_dynamic_type(comm_spec, tensor));
first_shape[axis] = total_num; // the shape after combined
if (local_shape.empty()) {
local_shape.resize(n_dim, 0);
}
std::vector<int64_t> partition_index;
for (size_t i = 0; i < n_dim; i++) {
partition_index.push_back(frag.fid());
}
std::vector<int64_t> vy_tensor_shape;
for (auto e : local_shape) {
vy_tensor_shape.push_back(static_cast<int64_t>(e));
}
vineyard::ObjectID tensor_chunk_id;
if (data_type == dynamic::Type::kInt64Type) {
vineyard::TensorBuilder<int64_t> tensor_builder(client, vy_tensor_shape,
partition_index);
for (size_t offset = 0; offset < tensor.size(); offset++) {
tensor_builder.data()[offset] = tensor.data()[offset].GetInt64();
}
auto vy_tensor = std::dynamic_pointer_cast<vineyard::Tensor<int64_t>>(
tensor_builder.Seal(client));
VY_OK_OR_RAISE(vy_tensor->Persist(client));
tensor_chunk_id = vy_tensor->id();
} else if (data_type == dynamic::Type::kDoubleType) {
vineyard::TensorBuilder<double> tensor_builder(client, vy_tensor_shape,
partition_index);
for (size_t offset = 0; offset < tensor.size(); offset++) {
tensor_builder.data()[offset] = tensor.data()[offset].GetDouble();
}
auto vy_tensor = std::dynamic_pointer_cast<vineyard::Tensor<std::string>>(
tensor_builder.Seal(client));
VY_OK_OR_RAISE(vy_tensor->Persist(client));
tensor_chunk_id = vy_tensor->id();
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Only support int64 or double");
}
std::vector<int64_t> global_shape;
std::vector<int64_t> global_partition_shape;
for (auto e : first_shape) {
global_shape.push_back(static_cast<int64_t>(e));
global_partition_shape.push_back(frag.fnum());
}
MPIGlobalTensorBuilder builder(client, comm_spec);
builder.set_shape(global_shape);
builder.set_partition_shape(global_partition_shape);
builder.AddChunk(tensor_chunk_id);
return builder.Seal(client)->id();
}
bl::result<vineyard::ObjectID> ToVineyardDataframe(
const grape::CommSpec& comm_spec, vineyard::Client& client) override {
auto shape = ctx_->shape();
auto& tensor = ctx_->tensor();
auto& frag = ctx_->fragment();
BOOST_LEAF_AUTO(n_dim, get_n_dim<data_t>(comm_spec, tensor));
if (n_dim != 2) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidValueError,
"This is not a 2-dims tensor, n-dim: " + std::to_string(n_dim));
}
BOOST_LEAF_AUTO(n_col, get_n_column<data_t>(comm_spec, tensor));
size_t n_row = shape.empty() ? 0 : shape[0];
vineyard::DataFrameBuilder df_builder(client);
df_builder.set_partition_index(frag.fid(), 0);
df_builder.set_row_batch_index(frag.fid());
BOOST_LEAF_AUTO(data_type, get_dynamic_type(comm_spec, tensor));
if (data_type == dynamic::Type::kInt64Type) {
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
std::vector<int64_t> shape{static_cast<int64_t>(n_row)};
auto tensor_builder =
std::make_shared<vineyard::TensorBuilder<int64_t>>(client, shape);
for (auto row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
tensor_builder->data()[row_idx] = tensor.data()[idx].GetInt64();
}
df_builder.AddColumn("Col " + std::to_string(col_idx), tensor_builder);
}
} else if (data_type == dynamic::Type::kDoubleType) {
for (size_t col_idx = 0; col_idx < n_col; col_idx++) {
std::vector<int64_t> shape{static_cast<int64_t>(n_row)};
auto tensor_builder =
std::make_shared<vineyard::TensorBuilder<double>>(client, shape);
for (auto row_idx = 0; row_idx < n_row; row_idx++) {
auto idx = row_idx * n_col + col_idx;
tensor_builder->data()[row_idx] = tensor.data()[idx].GetDouble();
}
df_builder.AddColumn("Col " + std::to_string(col_idx), tensor_builder);
}
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Only support int64 or double");
}
auto df = df_builder.Seal(client);
VY_OK_OR_RAISE(df->Persist(client));
auto df_chunk_id = df->id();
MPIGlobalDataFrameBuilder builder(client, comm_spec);
builder.set_partition_shape(frag.fnum(), n_col);
builder.AddChunk(df_chunk_id);
return builder.Seal(client)->id();
}
bl::result<std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
ToArrowArrays(
const grape::CommSpec& comm_spec,
const std::vector<std::pair<std::string, Selector>>& selectors) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented ToArrowArrays for dynamic type");
}
private:
bl::result<dynamic::Type> get_dynamic_type(
const grape::CommSpec& comm_spec,
const trivial_tensor_t<dynamic::Value>& tensor) {
int type = tensor.size() == 0 ? dynamic::Type::kNullType
: dynamic::GetType(tensor.data()[0]);
std::vector<int> types;
vineyard::GlobalAllGatherv<int>(type, types, comm_spec);
type = dynamic::Type::kNullType;
for (auto e : types) {
if (e != dynamic::Type::kNullType) {
type = e;
break;
}
}
for (auto e : types) {
if (e != dynamic::Type::kNullType && e != type) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"The types of dynamic::Value is not same.");
}
}
return static_cast<dynamic::Type>(type);
}
std::shared_ptr<IFragmentWrapper> frag_wrapper_;
std::shared_ptr<context_t> ctx_;
};
#endif
} // namespace gs
#endif // ANALYTICAL_ENGINE_CORE_CONTEXT_TENSOR_CONTEXT_H_