in interactive_engine/executor/store/global_query/src/store_impl/v6d/native/htap_ds_impl.cc [29:212]
void get_graph_handle(ObjectId id, PartitionId channel_num,
GraphHandleImpl* handle) {
#ifndef NDEBUG
LOG(INFO) << "enter " << __FUNCTION__;
#endif
auto client = std::make_unique<vineyard::Client>();
VINEYARD_CHECK_OK(client->Connect());
LOG(INFO) << "Initialize vineyard client";
std::shared_ptr<vineyard::ArrowFragmentGroup> fg =
std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client->GetObject(id));
LOG(INFO) << "Get vineyard object ok: " << fg;
vineyard::fid_t total_frag_num = fg->total_frag_num();
LabelId vertex_label_num = fg->vertex_label_num();
LabelId edge_label_num = fg->edge_label_num();
LOG(INFO) << "FragGroup: total frag num = " << total_frag_num
<< ", vertex label num = " << vertex_label_num
<< ", edge label num = " << edge_label_num;
handle->fnum = total_frag_num;
handle->vid_parser.Init(total_frag_num, vertex_label_num);
handle->eid_parser.Init(total_frag_num, edge_label_num);
handle->channel_num = channel_num;
handle->vertex_label_num = vertex_label_num;
handle->edge_label_num = edge_label_num;
handle->local_fnum = 0;
uint64_t native_instance_id = client->instance_id();
for (const auto& pair : fg->FragmentLocations()) {
if (pair.second == native_instance_id) {
++handle->local_fnum;
}
}
// FIXME: make the following exception free
if (handle->local_fnum == 0) {
handle->local_fragments = NULL;
} else {
handle->local_fragments = new FRAG_ID_TYPE[handle->local_fnum];
}
FRAG_ID_TYPE local_frag_index = 0;
for (const auto& pair : fg->FragmentLocations()) {
if (pair.second == native_instance_id) {
handle->local_fragments[local_frag_index] = pair.first;
++local_frag_index;
}
}
handle->schema = NULL;
handle->fragments = NULL;
handle->int32_fragments = NULL;
handle->string_fragments = NULL;
handle->vertex_map = NULL;
handle->int32_vertex_map = NULL;
handle->string_vertex_map = NULL;
for (const auto& pair : fg->Fragments()) {
FRAG_ID_TYPE fid = pair.first;
LOG(INFO) << "fid = " << fid
<< ", instance_id = " << client->instance_id()
<< ", location = " << fg->FragmentLocations().at(fid);
if (fg->FragmentLocations().at(fid) == client->instance_id()) {
vineyard::ObjectMeta meta;
VINEYARD_CHECK_OK(client->GetMetaData(pair.second, meta));
#ifndef NDEBUG
LOG(INFO) << "begin construct fragment: " << pair.second << ", "
<< meta.GetTypeName();
#endif
std::string oid_type, vid_type;
if (meta.HasKey("oid_type")) {
oid_type = meta.GetKeyValue("oid_type");
} else if (meta.HasKey("oid_type_")) {
oid_type = meta.GetKeyValue("oid_type_");
} else {
LOG(FATAL) << "oid_type not found in meta: " << meta.ToString();
}
if (meta.HasKey("vid_type")) {
vid_type = meta.GetKeyValue("vid_type");
} else if (meta.HasKey("vid_type_")) {
vid_type = meta.GetKeyValue("vid_type_");
} else {
LOG(FATAL) << "vid_type not found in meta: " << meta.ToString();
}
if (oid_type == vineyard::type_name<OID_TYPE>()) {
if (handle->fragments == nullptr) {
handle->use_int64_oid = true;
handle->use_int32_oid = false;
handle->use_string_oid = false;
handle->fragments = new FRAGMENT_TYPE[total_frag_num];
}
handle->fragments[fid].Construct(meta);
} else if (oid_type == vineyard::type_name<INT32_OID_TYPE>()) {
if (handle->int32_fragments == nullptr) {
handle->use_int64_oid = false;
handle->use_int32_oid = true;
handle->use_string_oid = false;
handle->int32_fragments = new INT32_FRAGMENT_TYPE[total_frag_num];
}
handle->int32_fragments[fid].Construct(meta);
} else if (oid_type == vineyard::type_name<STRING_OID_TYPE>()) {
if (handle->string_fragments == nullptr) {
handle->use_int64_oid = false;
handle->use_int32_oid = false;
handle->use_string_oid = true;
handle->string_fragments = new STRING_FRAGMENT_TYPE[total_frag_num];
}
handle->string_fragments[fid].Construct(meta);
} else {
LOG(FATAL) << "Unsupported fragment type: " << meta.GetTypeName()
<< ", with OID type " << oid_type
<< ", with VID type " << vid_type;
}
if (handle->vertex_map == nullptr && handle->int32_vertex_map == nullptr && handle->string_vertex_map == nullptr) {
vineyard::ObjectID vertex_map_id = vineyard::InvalidObjectID();
if (handle->fragments != nullptr) {
vertex_map_id = handle->fragments[fid].vertex_map_id();
} else if (handle->int32_fragments != nullptr) {
vertex_map_id = handle->int32_fragments[fid].vertex_map_id();
} else {
vertex_map_id = handle->string_fragments[fid].vertex_map_id();
}
vineyard::ObjectMeta vm_meta;
LOG(INFO) << "begin get vertex map: " << vertex_map_id;
VINEYARD_CHECK_OK(client->GetMetaData(vertex_map_id, vm_meta));
#ifndef NDEBUG
LOG(INFO) << "begin construct vertex map: " << vertex_map_id;
#endif
if (handle->fragments != nullptr) {
handle->vertex_map = new VERTEX_MAP_TYPE();
handle->vertex_map->Construct(vm_meta);
} else if (handle->int32_fragments != nullptr) {
handle->int32_vertex_map = new INT32_VERTEX_MAP_TYPE();
handle->int32_vertex_map->Construct(vm_meta);
} else {
handle->string_vertex_map = new STRING_VERTEX_MAP_TYPE();
handle->string_vertex_map->Construct(vm_meta);
}
#ifndef NDEBUG
LOG(INFO) << "finish construct vertex map: " << pair.second
<< ", vertex map is: " << vertex_map_id;
#endif
}
if (handle->schema == NULL) {
vineyard::json schema_json;
if (handle->fragments != nullptr) {
handle->fragments[fid].schema().ToJSON(schema_json);
} else if (handle->int32_fragments != nullptr) {
handle->int32_fragments[fid].schema().ToJSON(schema_json);
} else {
handle->string_fragments[fid].schema().ToJSON(schema_json);
}
vineyard::htap::MGPropertyGraphSchema mgschema;
mgschema.FromJSON(schema_json);
handle->schema =
new vineyard::htap::MGPropertyGraphSchema(mgschema.TransformToInteractive());
}
}
}
handle->vertex_chunk_sizes =
static_cast<VID_TYPE**>(malloc(sizeof(VID_TYPE*) * total_frag_num));
for (vineyard::fid_t i = 0; i < total_frag_num; ++i) {
handle->vertex_chunk_sizes[i] =
static_cast<VID_TYPE*>(malloc(sizeof(VID_TYPE) * vertex_label_num));
for (LabelId j = 0; j < vertex_label_num; ++j) {
VID_TYPE ivnum;
if (handle->use_int64_oid) {
ivnum = handle->vertex_map->GetInnerVertexSize(i, j);
} else if (handle->use_int32_oid) {
ivnum = handle->int32_vertex_map->GetInnerVertexSize(i, j);
} else {
ivnum = handle->string_vertex_map->GetInnerVertexSize(i, j);
}
handle->vertex_chunk_sizes[i][j] =
(ivnum + channel_num - 1) / channel_num;
}
}
handle->client = client.release();
LOG(INFO) << "finish get graph handle: " << id << ", handle = " << handle;
}