void addEdgesRecordBatchImplHelper()

in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h [772:1031]


  void addEdgesRecordBatchImplHelper(
      label_t src_label_id, label_t dst_label_id, label_t e_label_id,
      const std::vector<std::string>& e_files,
      std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
          label_t, label_t, label_t, const std::string&, const LoadingConfig&,
          int)>
          supplier_creator) {
    auto src_label_name = schema_.get_vertex_label_name(src_label_id);
    auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
    auto edge_label_name = schema_.get_edge_label_name(e_label_id);
    auto edge_column_mappings = loading_config_.GetEdgeColumnMappings(
        src_label_id, dst_label_id, e_label_id);
    auto src_dst_col_pair = loading_config_.GetEdgeSrcDstCol(
        src_label_id, dst_label_id, e_label_id);
    if (src_dst_col_pair.first.size() != 1 ||
        src_dst_col_pair.second.size() != 1) {
      LOG(FATAL) << "We currently only support one src primary key and one "
                    "dst primary key";
    }
    size_t src_col_ind = src_dst_col_pair.first[0].second;
    size_t dst_col_ind = src_dst_col_pair.second[0].second;
    CHECK(src_col_ind != dst_col_ind);

    check_edge_invariant(schema_, edge_column_mappings, src_col_ind,
                         dst_col_ind, src_label_id, dst_label_id, e_label_id);

    const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id);
    const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id);
    std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
    if constexpr (std::is_same_v<
                      VECTOR_T,
                      mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>> ||
                  std::is_same_v<
                      VECTOR_T,
                      mmap_vector<std::tuple<vid_t, vid_t, size_t>>>) {
      const auto& work_dir = basic_fragment_loader_.work_dir();
      for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
        parsed_edges_vec[i].open(runtime_dir(work_dir) + "/" + src_label_name +
                                 "_" + dst_label_name + "_" + edge_label_name +
                                 "_" + std::to_string(i) + ".tmp");
        parsed_edges_vec[i].reserve(4096);
      }
    }
    std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
        oe_degree(src_indexer.size());
    for (size_t idx = 0; idx < ie_degree.size(); ++idx) {
      ie_degree[idx].store(0);
    }
    for (size_t idx = 0; idx < oe_degree.size(); ++idx) {
      oe_degree[idx].store(0);
    }
    VLOG(10) << "src indexer size: " << src_indexer.size()
             << " dst indexer size: " << dst_indexer.size();

    grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
    queue.SetLimit(1024);
    std::vector<std::thread> work_threads;

    std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
        std::thread::hardware_concurrency());

    if constexpr (std::is_same<EDATA_T, RecordView>::value) {
      basic_fragment_loader_.init_edge_table(src_label_id, dst_label_id,
                                             e_label_id);
    }

    // use a dummy vector to store the string columns, to avoid the
    // strings being released as record batch is released.
    std::vector<std::shared_ptr<arrow::Array>> string_cols;
    std::atomic<size_t> offset(0);
    std::shared_mutex rw_mutex;
    for (auto filename : e_files) {
      auto record_batch_supplier_vec =
          supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
                           loading_config_, parsed_edges_vec.size());

      queue.SetProducerNum(record_batch_supplier_vec.size());

      for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
        work_threads.emplace_back(
            [&](int idx) {
              auto& string_column = string_columns[idx];
              bool first_batch = true;
              auto& record_batch_supplier = record_batch_supplier_vec[idx];
              while (true) {
                auto record_batch = record_batch_supplier->GetNextBatch();
                if (!record_batch) {
                  queue.DecProducerNum();
                  break;
                }
                if (first_batch) {
                  auto header = record_batch->schema()->field_names();
                  auto schema_column_names = schema_.get_edge_property_names(
                      src_label_id, dst_label_id, e_label_id);
                  auto schema_column_types = schema_.get_edge_properties(
                      src_label_id, dst_label_id, e_label_id);
                  CHECK(schema_column_names.size() + 2 == header.size())
                      << "schema size: " << schema_column_names.size()
                      << " neq header size: " << header.size();
                  first_batch = false;
                }
                for (auto i = 0; i < record_batch->num_columns(); ++i) {
                  if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
                      record_batch->column(i)->type()->Equals(
                          arrow::large_utf8())) {
                    string_column.emplace_back(record_batch->column(i));
                  }
                }

                queue.Put(record_batch);
              }
            },
            i);
      }
      for (size_t i = 0;
           i <
           std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
                    std::thread::hardware_concurrency());
           ++i) {
        work_threads.emplace_back(
            [&](int idx) {
              // copy the table to csr.
              auto& parsed_edges = parsed_edges_vec[idx];
              while (true) {
                std::shared_ptr<arrow::RecordBatch> record_batch{nullptr};
                auto ret = queue.Get(record_batch);
                if (!ret) {
                  break;
                }
                if (!record_batch) {
                  LOG(FATAL) << "get nullptr batch";
                }
                auto columns = record_batch->columns();
                // We assume the src_col and dst_col will always be put
                // at front.
                CHECK(columns.size() >= 2);
                auto src_col = columns[0];
                auto dst_col = columns[1];
                auto src_col_type = src_col->type();
                auto dst_col_type = dst_col->type();
                CHECK(check_primary_key_type(src_col_type))
                    << "unsupported src_col type: " << src_col_type->ToString();
                CHECK(check_primary_key_type(dst_col_type))
                    << "unsupported dst_col type: " << dst_col_type->ToString();

                std::vector<std::shared_ptr<arrow::Array>> property_cols;
                for (size_t i = 2; i < columns.size(); ++i) {
                  property_cols.emplace_back(columns[i]);
                }
                size_t offset_i = 0;
                if constexpr (std::is_same<EDATA_T, RecordView>::value) {
                  auto casted_csr = dynamic_cast<DualCsr<RecordView>*>(
                      basic_fragment_loader_.get_csr(src_label_id, dst_label_id,
                                                     e_label_id));
                  CHECK(casted_csr != NULL);
                  auto table = casted_csr->GetTable();
                  CHECK(table.col_num() == property_cols.size());
                  offset_i = offset.fetch_add(src_col->length());
                  std::vector<size_t> offsets;
                  for (size_t _i = 0;
                       _i < static_cast<size_t>(src_col->length()); ++_i) {
                    offsets.emplace_back(offset_i + _i);
                  }
                  size_t row_num = std::max(table.row_num(), 1ul);

                  while (row_num < offset_i + src_col->length()) {
                    row_num *= 2;
                  }
                  if (row_num > table.row_num()) {
                    std::unique_lock<std::shared_mutex> lock(rw_mutex);
                    if (row_num > table.row_num()) {
                      table.resize(row_num);
                    }
                  }

                  {
                    std::shared_lock<std::shared_mutex> lock(rw_mutex);
                    for (size_t i = 0; i < table.col_num(); ++i) {
                      auto col = table.get_column_by_id(i);
                      auto chunked_array =
                          std::make_shared<arrow::ChunkedArray>(
                              property_cols[i]);
                      set_properties_column(col.get(), chunked_array, offsets);
                    }
                  }
                }
                auto edge_property = schema_.get_edge_property(
                    src_label_id, dst_label_id, e_label_id);
                // add edges to vector
                CHECK(src_col->length() == dst_col->length());
                if (src_col_type->Equals(arrow::int64())) {
                  _append_edges<int64_t, EDATA_T, VECTOR_T>(
                      src_col, dst_col, src_indexer, dst_indexer,
                      property_cols[0], edge_property, parsed_edges, ie_degree,
                      oe_degree, offset_i);
                } else if (src_col_type->Equals(arrow::uint64())) {
                  _append_edges<uint64_t, EDATA_T, VECTOR_T>(
                      src_col, dst_col, src_indexer, dst_indexer,
                      property_cols[0], edge_property, parsed_edges, ie_degree,
                      oe_degree, offset_i);
                } else if (src_col_type->Equals(arrow::int32())) {
                  _append_edges<int32_t, EDATA_T, VECTOR_T>(
                      src_col, dst_col, src_indexer, dst_indexer,
                      property_cols[0], edge_property, parsed_edges, ie_degree,
                      oe_degree, offset_i);
                } else if (src_col_type->Equals(arrow::uint32())) {
                  _append_edges<uint32_t, EDATA_T, VECTOR_T>(
                      src_col, dst_col, src_indexer, dst_indexer,
                      property_cols[0], edge_property, parsed_edges, ie_degree,
                      oe_degree, offset_i);
                } else {
                  // must be string
                  _append_edges<std::string_view, EDATA_T, VECTOR_T>(
                      src_col, dst_col, src_indexer, dst_indexer,
                      property_cols[0], edge_property, parsed_edges, ie_degree,
                      oe_degree, offset_i);
                }
              }
            },
            i);
      }

      for (auto& t : work_threads) {
        t.join();
      }
      VLOG(10) << "Finish parsing edge file:" << filename << " for label "
               << src_label_name << " -> " << dst_label_name << " -> "
               << edge_label_name;
    }
    VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label "
             << src_label_name << " -> " << dst_label_name << " -> "
             << edge_label_name;
    std::vector<int32_t> ie_deg(ie_degree.size());
    std::vector<int32_t> oe_deg(oe_degree.size());
    for (size_t idx = 0; idx < ie_deg.size(); ++idx) {
      ie_deg[idx] = ie_degree[idx];
    }
    for (size_t idx = 0; idx < oe_deg.size(); ++idx) {
      oe_deg[idx] = oe_degree[idx];
    }

    basic_fragment_loader_.PutEdges<EDATA_T, VECTOR_T>(
        src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
        oe_deg, build_csr_in_mem_);

    string_columns.clear();
    size_t sum = 0;
    for (auto& edges : parsed_edges_vec) {
      sum += edges.size();
      if constexpr (
          std::is_same<VECTOR_T,
                       mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
          std::is_same<VECTOR_T,
                       mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
        edges.unlink();
      }
    }

    VLOG(10) << "Finish putting: " << sum << " edges";
  }