in src/core/fishstore.h [776:947]
inline uint32_t FishStore<D, A>::BatchInsert(const char* data, size_t length,
uint64_t monotonic_serial_num, uint32_t internal_offset) {
#ifdef _TIMER
auto t_start = std::chrono::high_resolution_clock::now();
#endif
if (length == internal_offset) return 0;
// Fetch the thread-local parser and parser record batch.
const ParserState& parser_state = parser_states[parser_ctx().parser_no];
A::Load(parser_ctx().parser, data, length, internal_offset);
// make reservation for insert_contexts, kpts, field map, PSF arguments
// to prevent re-allocation.
std::vector<RecordInsertContext> insert_contexts;
insert_contexts.reserve(Constants::kDefaultBatchSize);
size_t max_kpt_size =
parser_state.ptr_general_psf.size() + parser_state.ptr_inline_psf.size();
size_t field_size = parser_state.main_parser_fields.size();
// `field_map` maps the field ID registered in naming service to parsed out
// value.
tsl::hopscotch_map<uint16_t, typename A::field_t> field_map(field_size);
std::vector<typename A::field_t> psf_args;
psf_args.reserve(field_size);
#ifdef _TIMER
auto t_parse_start = std::chrono::high_resolution_clock::now();
#endif
uint32_t current_offset = internal_offset;
// Iterate through records and fields so as to construct insert context for
// each record. Insert context contains the payload and the corresponding info
// to build all key pointers, stored in kpts.
while(A::HasNext(parser_ctx().parser)) {
auto& record = A::NextRecord(parser_ctx().parser);
// Get the full record payload.
auto rec_ref = record.GetRawText();
current_offset += static_cast<uint32_t>(rec_ref.Length());
insert_contexts.emplace_back(RecordInsertContext{
rec_ref.Data(), static_cast<uint32_t>(rec_ref.Length()), current_offset});
RecordInsertContext& insert_context = insert_contexts.back();
// Iterate through all parsed out fields to populate the field map for this
// record.
field_map.clear();
for(auto& field : record.GetFields()) {
field_map.emplace(parser_state.main_parser_field_ids[field.FieldId()], field);
}
// Construct key pointer utilities. For KPTUtil definition, pls refer to
// `internal_context.h`
std::vector<KPTUtil>& kpts = insert_context.kpts();
kpts.reserve(max_kpt_size);
std::vector<NullableStringRef>& options = insert_context.options();
#ifdef _TIMER
auto t_fields_start = std::chrono::high_resolution_clock::now();
#endif
uint32_t optional_offset = static_cast<uint32_t>(rec_ref.Length());
uint16_t n_general_pts = 0;
for(auto general_psf_id: parser_state.ptr_general_psf) {
psf_args.clear();
auto& psf = general_psf_map[general_psf_id];
uint32_t value_offset, value_size;
uint64_t hash;
bool missing_fields = false;
for (auto& field_id : psf.fields) {
auto it = field_map.find(field_id);
if (it != field_map.end())
psf_args.emplace_back(it->second);
else {
missing_fields = true;
break;
}
}
if (missing_fields) continue;
// If all the fields are found and predicate evaluates true, we build a
// key pointer for this predicate on the record.
NullableStringRef res;
//Get the benefit of inlining....
if (psf.lib_id == -1) res = projection<A>(psf_args);
else res = psf.eval_(psf_args);
if (res.is_null) continue;
hash = Utility::HashBytesWithPSFID(general_psf_id, res.payload, res.size);
value_size = res.size;
if (rec_ref.Data() <= res.payload && res.payload <= rec_ref.Data() + rec_ref.Length()) {
value_offset = static_cast<uint32_t>(res.payload - rec_ref.Data());
} else {
value_offset = optional_offset;
optional_offset += res.size;
options.emplace_back(res);
}
kpts.emplace_back(KPTUtil{ hash, general_psf_id, value_offset, value_size });
++n_general_pts;
}
insert_context.set_n_general_pts(n_general_pts);
insert_context.set_optional_size(optional_offset - static_cast<uint32_t>(rec_ref.Length()));
#ifdef _TIMER
auto t_fields_end = std::chrono::high_resolution_clock::now();
tot_fields_time += std::chrono::duration<double>(t_fields_end - t_fields_start).count();
auto t_preds_start = std::chrono::high_resolution_clock::now();
#endif
for(auto inline_psf_id: parser_state.ptr_inline_psf) {
psf_args.clear();
auto& psf = inline_psf_map[inline_psf_id];
bool flag = false;
for(auto& field_id : psf.fields) {
auto it = field_map.find(field_id);
if(it != field_map.end())
psf_args.emplace_back(it->second);
else {
flag = true;
break;
}
}
if(!flag) {
NullableInt res = psf.eval_(psf_args);
if(!res.is_null) {
kpts.emplace_back(KPTUtil{inline_psf_id, res.value});
}
}
}
#ifdef _TIMER
auto t_preds_end = std::chrono::high_resolution_clock::now();
tot_preds_time += std::chrono::duration<double>(t_preds_end - t_preds_start).count();
#endif
}
#ifdef _TIMER
auto t_parse_end = std::chrono::high_resolution_clock::now();
tot_prep_time += std::chrono::duration<double>(t_parse_end - t_parse_start).count();
#endif
// Finish preparing insert contexts, start populating the store.
uint32_t op_cnt = 0;
for(auto& insert_context : insert_contexts) {
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<RecordInsertContext> context{ctxt};
assert(result == Status::Ok);
};
#ifdef _TIMER
auto t_insert_start = std::chrono::high_resolution_clock::now();
#endif
auto status = Insert(insert_context, callback, monotonic_serial_num, insert_context.offset());
#ifdef _TIMER
auto t_insert_end = std::chrono::high_resolution_clock::now();
tot_insert_time += std::chrono::duration<double>(t_insert_end - t_insert_start).count();
#endif
assert(status == Status::Ok || status == Status::Pending);
++op_cnt;
}
//CompletePending(true);
#ifdef _TIMER
auto t_end = std::chrono::high_resolution_clock::now();
tot_cpu_time += std::chrono::duration<double>(t_end - t_start).count();
#endif
return op_cnt;
}