in cli/src/importer.h [215:505]
std::string DoImport(const py::dict& config_dict) {
auto import_config = ConvertPyDictToConfig(config_dict);
auto version =
graphar::InfoVersion::Parse(import_config.graphar_config.version).value();
fs::path save_path = import_config.graphar_config.path;
std::unordered_map<std::string, graphar::IdType> vertex_chunk_sizes;
std::unordered_map<std::string, int64_t> vertex_counts;
std::map<std::pair<std::string, std::string>,
std::unordered_map<std::shared_ptr<arrow::Scalar>, graphar::IdType,
arrow::Scalar::Hash, arrow::Scalar::PtrsEqual>>
vertex_prop_index_map;
std::unordered_map<std::string, std::vector<std::string>>
vertex_props_in_edges;
std::map<std::pair<std::string, std::string>, graphar::Property>
vertex_prop_property_map;
for (const auto& edge : import_config.import_schema.edges) {
vertex_props_in_edges[edge.src_type].emplace_back(edge.src_prop);
vertex_props_in_edges[edge.dst_type].emplace_back(edge.dst_prop);
}
for (const auto& vertex : import_config.import_schema.vertices) {
vertex_chunk_sizes[vertex.type] = vertex.chunk_size;
auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();
std::string primary_key;
for (const auto& pg : vertex.property_groups) {
std::vector<graphar::Property> props;
for (const auto& prop : pg.properties) {
if (prop.is_primary) {
if (!primary_key.empty()) {
throw std::runtime_error("Multiple primary keys found in vertex " +
vertex.type);
}
primary_key = prop.name;
}
graphar::Property property(
prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable);
props.emplace_back(property);
vertex_prop_property_map[std::make_pair(vertex.type, prop.name)] =
property;
}
// TODO: add prefix parameter in config
auto property_group = graphar::CreatePropertyGroup(
props, graphar::StringToFileType(pg.file_type));
pgs.emplace_back(property_group);
}
auto vertex_info =
graphar::CreateVertexInfo(vertex.type, vertex.chunk_size, pgs,
vertex.labels, vertex.prefix, version);
auto file_name = vertex.type + ".vertex.yml";
vertex_info->Save(save_path / file_name);
auto save_path_str = save_path.string();
save_path_str += "/";
auto vertex_prop_writer = graphar::VertexPropertyWriter::Make(
vertex_info, save_path_str,
StringToValidateLevel(vertex.validate_level))
.value();
std::vector<std::shared_ptr<arrow::Table>> vertex_tables;
for (const auto& source : vertex.sources) {
std::vector<std::string> column_names;
for (const auto& [key, value] : source.columns) {
column_names.emplace_back(key);
}
auto table = GetDataFromFile(source.path, column_names, source.delimiter,
source.file_type);
std::unordered_map<std::string, Property> column_prop_map;
std::unordered_map<std::string, std::string> reversed_columns_config;
for (const auto& [key, value] : source.columns) {
reversed_columns_config[value] = key;
}
for (const auto& pg : vertex.property_groups) {
for (const auto& prop : pg.properties) {
column_prop_map[reversed_columns_config[prop.name]] = prop;
}
}
std::unordered_map<
std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
columns_to_change;
for (const auto& [column, prop] : column_prop_map) {
auto arrow_data_type = graphar::DataType::DataTypeToArrowDataType(
graphar::DataType::TypeNameToDataType(prop.data_type));
auto arrow_column = table->GetColumnByName(column);
// TODO: whether need to check duplicate values for primary key?
if (!prop.nullable) {
for (const auto& chunk : arrow_column->chunks()) {
if (chunk->null_count() > 0) {
throw std::runtime_error("Non-nullable column '" + column +
"' has null values");
}
}
}
// TODO: check this
if (column != prop.name ||
arrow_column->type()->id() != arrow_data_type->id()) {
columns_to_change[column] =
std::make_pair(prop.name, arrow_data_type);
}
}
table = ChangeNameAndDataType(table, columns_to_change);
vertex_tables.emplace_back(table);
}
std::shared_ptr<arrow::Table> merged_vertex_table =
MergeTables(vertex_tables);
// TODO: check all fields in props
// TODO: add start_index in config
graphar::IdType start_chunk_index = 0;
auto vertex_table_with_index =
vertex_prop_writer
->AddIndexColumn(merged_vertex_table, start_chunk_index,
vertex_info->GetChunkSize())
.value();
for (const auto& property_group : pgs) {
vertex_prop_writer->WriteTable(vertex_table_with_index, property_group,
start_chunk_index);
}
if (vertex_props_in_edges.find(vertex.type) !=
vertex_props_in_edges.end()) {
for (const auto& vertex_prop : vertex_props_in_edges[vertex.type]) {
vertex_prop_index_map[std::make_pair(vertex.type, vertex_prop)] =
TableToUnorderedMap(vertex_table_with_index, vertex_prop,
graphar::GeneralParams::kVertexIndexCol);
}
}
auto vertex_count = merged_vertex_table->num_rows();
vertex_counts[vertex.type] = vertex_count;
vertex_prop_writer->WriteVerticesNum(vertex_count);
}
for (const auto& edge : import_config.import_schema.edges) {
auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();
for (const auto& pg : edge.property_groups) {
std::vector<graphar::Property> props;
for (const auto& prop : pg.properties) {
props.emplace_back(graphar::Property(
prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable));
}
// TODO: add prefix parameter in config
auto property_group = graphar::CreatePropertyGroup(
props, graphar::StringToFileType(pg.file_type));
pgs.emplace_back(property_group);
}
graphar::AdjacentListVector adj_lists;
for (const auto& adj_list : edge.adj_lists) {
// TODO: add prefix parameter in config
adj_lists.emplace_back(graphar::CreateAdjacentList(
graphar::OrderedAlignedToAdjListType(adj_list.ordered,
adj_list.aligned_by),
graphar::StringToFileType(adj_list.file_type)));
}
// TODO: add directed parameter in config
bool directed = true;
// TODO: whether prefix has default value?
auto edge_info = graphar::CreateEdgeInfo(
edge.src_type, edge.edge_type, edge.dst_type, edge.chunk_size,
vertex_chunk_sizes[edge.src_type], vertex_chunk_sizes[edge.dst_type],
directed, adj_lists, pgs, edge.prefix, version);
auto file_name =
ConcatEdgeTriple(edge.src_type, edge.edge_type, edge.dst_type) +
".edge.yml";
edge_info->Save(save_path / file_name);
auto save_path_str = save_path.string();
save_path_str += "/";
for (const auto& adj_list : adj_lists) {
int64_t vertex_count;
if (adj_list->GetType() == graphar::AdjListType::ordered_by_source ||
adj_list->GetType() == graphar::AdjListType::unordered_by_source) {
vertex_count = vertex_counts[edge.src_type];
} else {
vertex_count = vertex_counts[edge.dst_type];
}
std::vector<std::shared_ptr<arrow::Table>> edge_tables;
for (const auto& source : edge.sources) {
std::vector<std::string> column_names;
for (const auto& [key, value] : source.columns) {
column_names.emplace_back(key);
}
auto table = GetDataFromFile(source.path, column_names,
source.delimiter, source.file_type);
std::unordered_map<std::string, graphar::Property> column_prop_map;
std::unordered_map<std::string, std::string> reversed_columns;
for (const auto& [key, value] : source.columns) {
reversed_columns[value] = key;
}
for (const auto& pg : edge.property_groups) {
for (const auto& prop : pg.properties) {
column_prop_map[reversed_columns[prop.name]] = graphar::Property(
prop.name,
graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable);
}
}
column_prop_map[reversed_columns.at(edge.src_prop)] =
vertex_prop_property_map.at(
std::make_pair(edge.src_type, edge.src_prop));
column_prop_map[reversed_columns.at(edge.dst_prop)] =
vertex_prop_property_map.at(
std::make_pair(edge.dst_type, edge.dst_prop));
std::unordered_map<
std::string,
std::pair<std::string, std::shared_ptr<arrow::DataType>>>
columns_to_change;
for (const auto& [column, prop] : column_prop_map) {
auto arrow_data_type =
graphar::DataType::DataTypeToArrowDataType(prop.type);
auto arrow_column = table->GetColumnByName(column);
// TODO: is needed?
if (!prop.is_nullable) {
for (const auto& chunk : arrow_column->chunks()) {
if (chunk->null_count() > 0) {
throw std::runtime_error("Non-nullable column '" + column +
"' has null values");
}
}
}
if (column != prop.name ||
arrow_column->type()->id() != arrow_data_type->id()) {
columns_to_change[column] =
std::make_pair(prop.name, arrow_data_type);
}
}
table = ChangeNameAndDataType(table, columns_to_change);
edge_tables.emplace_back(table);
}
std::unordered_map<
std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
vertex_columns_to_change;
std::shared_ptr<arrow::Table> merged_edge_table =
MergeTables(edge_tables);
// TODO: check all fields in props
auto combined_edge_table =
merged_edge_table->CombineChunks().ValueOrDie();
auto edge_builder =
graphar::builder::EdgesBuilder::Make(
edge_info, save_path_str, adj_list->GetType(), vertex_count,
StringToValidateLevel(edge.validate_level))
.value();
std::vector<std::string> edge_column_names;
for (const auto& field : combined_edge_table->schema()->fields()) {
edge_column_names.push_back(field->name());
}
const int64_t num_rows = combined_edge_table->num_rows();
for (int64_t i = 0; i < num_rows; ++i) {
auto edge_src_column =
combined_edge_table->GetColumnByName(edge.src_prop);
auto edge_dst_column =
combined_edge_table->GetColumnByName(edge.dst_prop);
graphar::builder::Edge e(
vertex_prop_index_map
.at(std::make_pair(edge.src_type, edge.src_prop))
.at(edge_src_column->GetScalar(i).ValueOrDie()),
vertex_prop_index_map
.at(std::make_pair(edge.dst_type, edge.dst_prop))
.at(edge_dst_column->GetScalar(i).ValueOrDie()));
for (const auto& column_name : edge_column_names) {
if (column_name != edge.src_prop && column_name != edge.dst_prop) {
auto column = combined_edge_table->GetColumnByName(column_name);
auto column_type = column->type();
std::any value;
TryToCastToAny(
graphar::DataType::ArrowDataTypeToDataType(column_type),
column->chunk(0), value);
e.AddProperty(column_name, value);
}
}
edge_builder->AddEdge(e);
}
edge_builder->Dump();
}
}
return "Imported successfully!";
}