std::string DoImport()

in cli/src/importer.h [215:505]


std::string DoImport(const py::dict& config_dict) {
  auto import_config = ConvertPyDictToConfig(config_dict);

  auto version =
      graphar::InfoVersion::Parse(import_config.graphar_config.version).value();
  fs::path save_path = import_config.graphar_config.path;

  std::unordered_map<std::string, graphar::IdType> vertex_chunk_sizes;
  std::unordered_map<std::string, int64_t> vertex_counts;

  std::map<std::pair<std::string, std::string>,
           std::unordered_map<std::shared_ptr<arrow::Scalar>, graphar::IdType,
                              arrow::Scalar::Hash, arrow::Scalar::PtrsEqual>>
      vertex_prop_index_map;

  std::unordered_map<std::string, std::vector<std::string>>
      vertex_props_in_edges;
  std::map<std::pair<std::string, std::string>, graphar::Property>
      vertex_prop_property_map;
  for (const auto& edge : import_config.import_schema.edges) {
    vertex_props_in_edges[edge.src_type].emplace_back(edge.src_prop);
    vertex_props_in_edges[edge.dst_type].emplace_back(edge.dst_prop);
  }
  for (const auto& vertex : import_config.import_schema.vertices) {
    vertex_chunk_sizes[vertex.type] = vertex.chunk_size;

    auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();
    std::string primary_key;
    for (const auto& pg : vertex.property_groups) {
      std::vector<graphar::Property> props;
      for (const auto& prop : pg.properties) {
        if (prop.is_primary) {
          if (!primary_key.empty()) {
            throw std::runtime_error("Multiple primary keys found in vertex " +
                                     vertex.type);
          }
          primary_key = prop.name;
        }
        graphar::Property property(
            prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
            prop.is_primary, prop.nullable);
        props.emplace_back(property);
        vertex_prop_property_map[std::make_pair(vertex.type, prop.name)] =
            property;
      }
      // TODO: add prefix parameter in config
      auto property_group = graphar::CreatePropertyGroup(
          props, graphar::StringToFileType(pg.file_type));
      pgs.emplace_back(property_group);
    }

    auto vertex_info =
        graphar::CreateVertexInfo(vertex.type, vertex.chunk_size, pgs,
                                  vertex.labels, vertex.prefix, version);
    auto file_name = vertex.type + ".vertex.yml";
    vertex_info->Save(save_path / file_name);
    auto save_path_str = save_path.string();
    save_path_str += "/";
    auto vertex_prop_writer = graphar::VertexPropertyWriter::Make(
                                  vertex_info, save_path_str,
                                  StringToValidateLevel(vertex.validate_level))
                                  .value();

    std::vector<std::shared_ptr<arrow::Table>> vertex_tables;
    for (const auto& source : vertex.sources) {
      std::vector<std::string> column_names;
      for (const auto& [key, value] : source.columns) {
        column_names.emplace_back(key);
      }
      auto table = GetDataFromFile(source.path, column_names, source.delimiter,
                                   source.file_type);

      std::unordered_map<std::string, Property> column_prop_map;
      std::unordered_map<std::string, std::string> reversed_columns_config;
      for (const auto& [key, value] : source.columns) {
        reversed_columns_config[value] = key;
      }
      for (const auto& pg : vertex.property_groups) {
        for (const auto& prop : pg.properties) {
          column_prop_map[reversed_columns_config[prop.name]] = prop;
        }
      }
      std::unordered_map<
          std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
          columns_to_change;
      for (const auto& [column, prop] : column_prop_map) {
        auto arrow_data_type = graphar::DataType::DataTypeToArrowDataType(
            graphar::DataType::TypeNameToDataType(prop.data_type));
        auto arrow_column = table->GetColumnByName(column);
        // TODO: whether need to check duplicate values for primary key?
        if (!prop.nullable) {
          for (const auto& chunk : arrow_column->chunks()) {
            if (chunk->null_count() > 0) {
              throw std::runtime_error("Non-nullable column '" + column +
                                       "' has null values");
            }
          }
        }
        // TODO: check this
        if (column != prop.name ||
            arrow_column->type()->id() != arrow_data_type->id()) {
          columns_to_change[column] =
              std::make_pair(prop.name, arrow_data_type);
        }
      }
      table = ChangeNameAndDataType(table, columns_to_change);
      vertex_tables.emplace_back(table);
    }
    std::shared_ptr<arrow::Table> merged_vertex_table =
        MergeTables(vertex_tables);
    // TODO: check all fields in props
    // TODO: add start_index in config
    graphar::IdType start_chunk_index = 0;

    auto vertex_table_with_index =
        vertex_prop_writer
            ->AddIndexColumn(merged_vertex_table, start_chunk_index,
                             vertex_info->GetChunkSize())
            .value();
    for (const auto& property_group : pgs) {
      vertex_prop_writer->WriteTable(vertex_table_with_index, property_group,
                                     start_chunk_index);
    }
    if (vertex_props_in_edges.find(vertex.type) !=
        vertex_props_in_edges.end()) {
      for (const auto& vertex_prop : vertex_props_in_edges[vertex.type]) {
        vertex_prop_index_map[std::make_pair(vertex.type, vertex_prop)] =
            TableToUnorderedMap(vertex_table_with_index, vertex_prop,
                                graphar::GeneralParams::kVertexIndexCol);
      }
    }
    auto vertex_count = merged_vertex_table->num_rows();
    vertex_counts[vertex.type] = vertex_count;
    vertex_prop_writer->WriteVerticesNum(vertex_count);
  }

  for (const auto& edge : import_config.import_schema.edges) {
    auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();

    for (const auto& pg : edge.property_groups) {
      std::vector<graphar::Property> props;
      for (const auto& prop : pg.properties) {
        props.emplace_back(graphar::Property(
            prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
            prop.is_primary, prop.nullable));
      }
      // TODO: add prefix parameter in config
      auto property_group = graphar::CreatePropertyGroup(
          props, graphar::StringToFileType(pg.file_type));
      pgs.emplace_back(property_group);
    }
    graphar::AdjacentListVector adj_lists;
    for (const auto& adj_list : edge.adj_lists) {
      // TODO: add prefix parameter in config
      adj_lists.emplace_back(graphar::CreateAdjacentList(
          graphar::OrderedAlignedToAdjListType(adj_list.ordered,
                                               adj_list.aligned_by),
          graphar::StringToFileType(adj_list.file_type)));
    }

    // TODO: add directed parameter in config

    bool directed = true;
    // TODO: whether prefix has default value?

    auto edge_info = graphar::CreateEdgeInfo(
        edge.src_type, edge.edge_type, edge.dst_type, edge.chunk_size,
        vertex_chunk_sizes[edge.src_type], vertex_chunk_sizes[edge.dst_type],
        directed, adj_lists, pgs, edge.prefix, version);
    auto file_name =
        ConcatEdgeTriple(edge.src_type, edge.edge_type, edge.dst_type) +
        ".edge.yml";
    edge_info->Save(save_path / file_name);
    auto save_path_str = save_path.string();
    save_path_str += "/";
    for (const auto& adj_list : adj_lists) {
      int64_t vertex_count;
      if (adj_list->GetType() == graphar::AdjListType::ordered_by_source ||
          adj_list->GetType() == graphar::AdjListType::unordered_by_source) {
        vertex_count = vertex_counts[edge.src_type];
      } else {
        vertex_count = vertex_counts[edge.dst_type];
      }
      std::vector<std::shared_ptr<arrow::Table>> edge_tables;

      for (const auto& source : edge.sources) {
        std::vector<std::string> column_names;
        for (const auto& [key, value] : source.columns) {
          column_names.emplace_back(key);
        }
        auto table = GetDataFromFile(source.path, column_names,
                                     source.delimiter, source.file_type);
        std::unordered_map<std::string, graphar::Property> column_prop_map;
        std::unordered_map<std::string, std::string> reversed_columns;
        for (const auto& [key, value] : source.columns) {
          reversed_columns[value] = key;
        }

        for (const auto& pg : edge.property_groups) {
          for (const auto& prop : pg.properties) {
            column_prop_map[reversed_columns[prop.name]] = graphar::Property(
                prop.name,
                graphar::DataType::TypeNameToDataType(prop.data_type),
                prop.is_primary, prop.nullable);
          }
        }
        column_prop_map[reversed_columns.at(edge.src_prop)] =
            vertex_prop_property_map.at(
                std::make_pair(edge.src_type, edge.src_prop));
        column_prop_map[reversed_columns.at(edge.dst_prop)] =
            vertex_prop_property_map.at(
                std::make_pair(edge.dst_type, edge.dst_prop));
        std::unordered_map<
            std::string,
            std::pair<std::string, std::shared_ptr<arrow::DataType>>>
            columns_to_change;
        for (const auto& [column, prop] : column_prop_map) {
          auto arrow_data_type =
              graphar::DataType::DataTypeToArrowDataType(prop.type);
          auto arrow_column = table->GetColumnByName(column);
          // TODO: is needed?
          if (!prop.is_nullable) {
            for (const auto& chunk : arrow_column->chunks()) {
              if (chunk->null_count() > 0) {
                throw std::runtime_error("Non-nullable column '" + column +
                                         "' has null values");
              }
            }
          }
          if (column != prop.name ||
              arrow_column->type()->id() != arrow_data_type->id()) {
            columns_to_change[column] =
                std::make_pair(prop.name, arrow_data_type);
          }
        }
        table = ChangeNameAndDataType(table, columns_to_change);
        edge_tables.emplace_back(table);
      }
      std::unordered_map<
          std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
          vertex_columns_to_change;

      std::shared_ptr<arrow::Table> merged_edge_table =
          MergeTables(edge_tables);
      // TODO: check all fields in props

      auto combined_edge_table =
          merged_edge_table->CombineChunks().ValueOrDie();

      auto edge_builder =
          graphar::builder::EdgesBuilder::Make(
              edge_info, save_path_str, adj_list->GetType(), vertex_count,
              StringToValidateLevel(edge.validate_level))
              .value();

      std::vector<std::string> edge_column_names;
      for (const auto& field : combined_edge_table->schema()->fields()) {
        edge_column_names.push_back(field->name());
      }
      const int64_t num_rows = combined_edge_table->num_rows();
      for (int64_t i = 0; i < num_rows; ++i) {
        auto edge_src_column =
            combined_edge_table->GetColumnByName(edge.src_prop);
        auto edge_dst_column =
            combined_edge_table->GetColumnByName(edge.dst_prop);

        graphar::builder::Edge e(
            vertex_prop_index_map
                .at(std::make_pair(edge.src_type, edge.src_prop))
                .at(edge_src_column->GetScalar(i).ValueOrDie()),
            vertex_prop_index_map
                .at(std::make_pair(edge.dst_type, edge.dst_prop))
                .at(edge_dst_column->GetScalar(i).ValueOrDie()));
        for (const auto& column_name : edge_column_names) {
          if (column_name != edge.src_prop && column_name != edge.dst_prop) {
            auto column = combined_edge_table->GetColumnByName(column_name);
            auto column_type = column->type();
            std::any value;
            TryToCastToAny(
                graphar::DataType::ArrowDataTypeToDataType(column_type),
                column->chunk(0), value);
            e.AddProperty(column_name, value);
          }
        }
        edge_builder->AddEdge(e);
      }
      edge_builder->Dump();
    }
  }
  return "Imported successfully!";
}