inline uint32_t FishStore::BatchInsert()

in src/core/fishstore.h [776:947]


inline uint32_t FishStore<D, A>::BatchInsert(const char* data, size_t length,
    uint64_t monotonic_serial_num, uint32_t internal_offset) {

#ifdef _TIMER
  auto t_start = std::chrono::high_resolution_clock::now();
#endif

  if (length == internal_offset) return 0;

  // Fetch the thread-local parser and parser record batch.
  const ParserState& parser_state = parser_states[parser_ctx().parser_no];
  A::Load(parser_ctx().parser, data, length, internal_offset);

  // make reservation for insert_contexts, kpts, field map, PSF arguments
  // to prevent re-allocation.
  std::vector<RecordInsertContext> insert_contexts;
  insert_contexts.reserve(Constants::kDefaultBatchSize);
  size_t max_kpt_size =
    parser_state.ptr_general_psf.size() + parser_state.ptr_inline_psf.size();
  size_t field_size = parser_state.main_parser_fields.size();
  // `field_map` maps the field ID registered in naming service to parsed out
  // value.
  tsl::hopscotch_map<uint16_t, typename A::field_t> field_map(field_size);
  std::vector<typename A::field_t> psf_args;
  psf_args.reserve(field_size);

#ifdef _TIMER
  auto t_parse_start = std::chrono::high_resolution_clock::now();
#endif

  uint32_t current_offset = internal_offset;
  // Iterate through records and fields so as to construct insert context for
  // each record. Insert context contains the payload and the corresponding info
  // to build all key pointers, stored in kpts.
  while(A::HasNext(parser_ctx().parser)) {
    auto& record = A::NextRecord(parser_ctx().parser);
    // Get the full record payload.
    auto rec_ref = record.GetRawText();
    current_offset += static_cast<uint32_t>(rec_ref.Length());
    insert_contexts.emplace_back(RecordInsertContext{
      rec_ref.Data(), static_cast<uint32_t>(rec_ref.Length()), current_offset});
    RecordInsertContext& insert_context = insert_contexts.back();
    // Iterate through all parsed out fields to populate the field map for this
    // record.
    field_map.clear();
    for(auto& field : record.GetFields()) {
      field_map.emplace(parser_state.main_parser_field_ids[field.FieldId()], field);
    }

    // Construct key pointer utilities. For KPTUtil definition, pls refer to
    // `internal_context.h`
    std::vector<KPTUtil>& kpts = insert_context.kpts();
    kpts.reserve(max_kpt_size);
    std::vector<NullableStringRef>& options = insert_context.options();

#ifdef _TIMER
    auto t_fields_start = std::chrono::high_resolution_clock::now();
#endif

    uint32_t optional_offset = static_cast<uint32_t>(rec_ref.Length());
    uint16_t n_general_pts = 0;
    for(auto general_psf_id: parser_state.ptr_general_psf) {
      psf_args.clear();
      auto& psf = general_psf_map[general_psf_id];
      uint32_t value_offset, value_size;
      uint64_t hash;
      bool missing_fields = false;
      for (auto& field_id : psf.fields) {
        auto it = field_map.find(field_id);
        if (it != field_map.end())
          psf_args.emplace_back(it->second);
        else {
          missing_fields = true;
          break;
        }
      }

      if (missing_fields) continue;
      // If all the fields are found and predicate evaluates true, we build a
      // key pointer for this predicate on the record.
      NullableStringRef res;
      //Get the benefit of inlining....
      if (psf.lib_id == -1) res = projection<A>(psf_args);
      else res = psf.eval_(psf_args);
      if (res.is_null) continue;

      hash = Utility::HashBytesWithPSFID(general_psf_id, res.payload, res.size);
      value_size = res.size;
      if (rec_ref.Data() <= res.payload && res.payload <= rec_ref.Data() + rec_ref.Length()) {
        value_offset = static_cast<uint32_t>(res.payload - rec_ref.Data());
      } else {
        value_offset = optional_offset;
        optional_offset += res.size;
        options.emplace_back(res);
      }
      kpts.emplace_back(KPTUtil{ hash, general_psf_id, value_offset, value_size });
      ++n_general_pts;
    }
    insert_context.set_n_general_pts(n_general_pts);
    insert_context.set_optional_size(optional_offset - static_cast<uint32_t>(rec_ref.Length()));

#ifdef _TIMER
    auto t_fields_end = std::chrono::high_resolution_clock::now();
    tot_fields_time += std::chrono::duration<double>(t_fields_end - t_fields_start).count();
    auto t_preds_start = std::chrono::high_resolution_clock::now();
#endif

    for(auto inline_psf_id: parser_state.ptr_inline_psf) {
      psf_args.clear();
      auto& psf = inline_psf_map[inline_psf_id];
      bool flag = false;
      for(auto& field_id : psf.fields) {
        auto it = field_map.find(field_id);
        if(it != field_map.end())
          psf_args.emplace_back(it->second);
        else {
          flag = true;
          break;
        }
      }

      if(!flag) {
        NullableInt res = psf.eval_(psf_args);
        if(!res.is_null) {
          kpts.emplace_back(KPTUtil{inline_psf_id, res.value});
        }
      }
    }

#ifdef _TIMER
    auto t_preds_end = std::chrono::high_resolution_clock::now();
    tot_preds_time += std::chrono::duration<double>(t_preds_end - t_preds_start).count();
#endif
  }

#ifdef _TIMER
  auto t_parse_end = std::chrono::high_resolution_clock::now();
  tot_prep_time += std::chrono::duration<double>(t_parse_end - t_parse_start).count();
#endif

  // Finish preparing insert contexts, start populating the store.
  uint32_t op_cnt = 0;
  for(auto& insert_context : insert_contexts) {
    auto callback = [](IAsyncContext* ctxt, Status result) {
      CallbackContext<RecordInsertContext> context{ctxt};
      assert(result == Status::Ok);
    };

#ifdef _TIMER
    auto t_insert_start = std::chrono::high_resolution_clock::now();
#endif

    auto status = Insert(insert_context, callback, monotonic_serial_num, insert_context.offset());

#ifdef _TIMER
    auto t_insert_end = std::chrono::high_resolution_clock::now();
    tot_insert_time += std::chrono::duration<double>(t_insert_end - t_insert_start).count();
#endif

    assert(status == Status::Ok || status == Status::Pending);
    ++op_cnt;
  }
  //CompletePending(true);

#ifdef _TIMER
  auto t_end = std::chrono::high_resolution_clock::now();
  tot_cpu_time += std::chrono::duration<double>(t_end - t_start).count();
#endif

  return op_cnt;

}