void AbstractArrowFragmentLoader::AddEdgesRecordBatch()

in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc [254:450]


void AbstractArrowFragmentLoader::AddEdgesRecordBatch(
    label_t src_label_i, label_t dst_label_i, label_t edge_label_i,
    const std::vector<std::string>& filenames,
    std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
        label_t, label_t, label_t, const std::string&, const LoadingConfig&,
        int)>
        supplier_creator) {
  auto src_label_name = schema_.get_vertex_label_name(src_label_i);
  auto dst_label_name = schema_.get_vertex_label_name(dst_label_i);
  auto edge_label_name = schema_.get_edge_label_name(edge_label_i);
  if (filenames.size() <= 0) {
    LOG(FATAL) << "No edge files found for src label: " << src_label_name
               << " dst label: " << dst_label_name
               << " edge label: " << edge_label_name;
  }
  if (filenames.size() <= 0) {
    LOG(FATAL) << "No edge files found for src label: " << src_label_name
               << " dst label: " << dst_label_name
               << " edge label: " << edge_label_name;
  }
  VLOG(10) << "Init edges src label: " << src_label_name
           << " dst label: " << dst_label_name
           << " edge label: " << edge_label_name
           << " filenames: " << filenames.size();
  auto& property_types = schema_.get_edge_properties(
      src_label_name, dst_label_name, edge_label_name);
  size_t col_num = property_types.size();
  EdgeStrategy oe_strategy = schema_.get_outgoing_edge_strategy(
      src_label_name, dst_label_name, edge_label_name);
  EdgeStrategy ie_strategy = schema_.get_incoming_edge_strategy(
      src_label_name, dst_label_name, edge_label_name);
  bool oe_mutable = schema_.outgoing_edge_mutable(
      src_label_name, dst_label_name, edge_label_name);
  bool ie_mutable = schema_.incoming_edge_mutable(
      src_label_name, dst_label_name, edge_label_name);
  if (col_num == 0) {
    auto dual_csr = new DualCsr<grape::EmptyType>(oe_strategy, ie_strategy,
                                                  oe_mutable, ie_mutable);
    basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                   dual_csr);
    if (filenames.empty()) {
      basic_fragment_loader_.AddNoPropEdgeBatch<grape::EmptyType>(
          src_label_i, dst_label_i, edge_label_i);
    } else {
      addEdgesRecordBatchImpl<grape::EmptyType>(
          src_label_i, dst_label_i, edge_label_i, filenames, supplier_creator);
    }
  } else if (col_num == 1) {
    if (property_types[0] == PropertyType::kBool) {
      auto dual_csr =
          new DualCsr<bool>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);
      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<bool>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<bool>(src_label_i, dst_label_i, edge_label_i,
                                      filenames, supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kDate) {
      auto dual_csr =
          new DualCsr<Date>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<Date>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<Date>(src_label_i, dst_label_i, edge_label_i,
                                      filenames, supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kInt32) {
      auto dual_csr = new DualCsr<int32_t>(oe_strategy, ie_strategy, oe_mutable,
                                           ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<int32_t>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<int32_t>(src_label_i, dst_label_i, edge_label_i,
                                         filenames, supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kUInt32) {
      auto dual_csr = new DualCsr<uint32_t>(oe_strategy, ie_strategy,
                                            oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<uint32_t>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<uint32_t>(src_label_i, dst_label_i,
                                          edge_label_i, filenames,
                                          supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kInt64) {
      auto dual_csr = new DualCsr<int64_t>(oe_strategy, ie_strategy, oe_mutable,
                                           ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<int64_t>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<int64_t>(src_label_i, dst_label_i, edge_label_i,
                                         filenames, supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kUInt64) {
      auto dual_csr = new DualCsr<uint64_t>(oe_strategy, ie_strategy,
                                            oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<uint64_t>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<uint64_t>(src_label_i, dst_label_i,
                                          edge_label_i, filenames,
                                          supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kDouble) {
      auto dual_csr =
          new DualCsr<double>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);
      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<double>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<double>(src_label_i, dst_label_i, edge_label_i,
                                        filenames, supplier_creator);
      }
    } else if (property_types[0] == PropertyType::kFloat) {
      auto dual_csr =
          new DualCsr<float>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);

      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<float>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<float>(src_label_i, dst_label_i, edge_label_i,
                                       filenames, supplier_creator);
      }
    } else if (property_types[0].type_enum ==
                   impl::PropertyTypeImpl::kVarChar ||
               property_types[0].type_enum ==
                   impl::PropertyTypeImpl::kStringView) {
      // Both varchar and string are treated as string. For String, we use the
      // default max length defined in PropertyType::GetStringDefaultMaxLength()
      uint16_t max_length = PropertyType::GetStringDefaultMaxLength();
      if (property_types[0].type_enum == impl::PropertyTypeImpl::kVarChar) {
        max_length = property_types[0].additional_type_info.max_length;
      }
      auto dual_csr = new DualCsr<std::string_view>(
          oe_strategy, ie_strategy, max_length, oe_mutable, ie_mutable);
      basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                     dual_csr);
      if (filenames.empty()) {
        basic_fragment_loader_.AddNoPropEdgeBatch<std::string_view>(
            src_label_i, dst_label_i, edge_label_i);
      } else {
        addEdgesRecordBatchImpl<std::string_view>(src_label_i, dst_label_i,
                                                  edge_label_i, filenames,
                                                  supplier_creator);
      }
    } else {
      LOG(FATAL) << "Unsupported edge property type." << property_types[0];
    }
  } else {
    const auto& props = schema_.get_edge_properties(
        src_label_name, dst_label_name, edge_label_name);
    const auto& prop_names = schema_.get_edge_property_names(
        src_label_name, dst_label_name, edge_label_name);
    auto dual_csr =
        new DualCsr<RecordView>(oe_strategy, ie_strategy, prop_names, props, {},
                                ie_mutable, oe_mutable);
    basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
                                   dual_csr);
    if (filenames.empty()) {
      LOG(FATAL) << "No edge files found for src label: " << src_label_name
                 << " dst label: " << dst_label_name
                 << " edge label: " << edge_label_name;
    } else {
      addEdgesRecordBatchImpl<RecordView>(
          src_label_i, dst_label_i, edge_label_i, filenames, supplier_creator);
    }
  }
}