in be/src/exec/hdfs-scanner.cc [347:567]
Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
FragmentState* state, llvm::Function** write_complete_tuple_fn) {
const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
LlvmCodeGen* codegen = state->codegen();
*write_complete_tuple_fn = NULL;
// Cast away const-ness. The codegen only sets the cached typed llvm struct.
TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc_);
vector<llvm::Function*> slot_fns;
for (int i = 0; i < node->materialized_slots_.size(); ++i) {
llvm::Function* fn = nullptr;
SlotDescriptor* slot_desc = node->materialized_slots_[i];
// If the type is CHAR, WriteSlot for this slot cannot be codegen'd. To keep codegen
// for other things, we call the interpreted code for this slot from the codegen'd
// code instead of failing codegen. See IMPALA-9747.
if (TextConverter::SupportsCodegenWriteSlot(slot_desc->type())) {
RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc,
&fn, node->hdfs_table_->null_column_value().data(),
node->hdfs_table_->null_column_value().size(), true,
state->query_options().strict_mode));
if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
}
slot_fns.push_back(fn);
}
// Compute order to materialize slots. BE assumes that conjuncts should
// be evaluated in the order specified (optimization is already done by FE)
vector<int> materialize_order;
node->ComputeSlotMaterializationOrder(state->desc_tbl(), &materialize_order);
// Get types to construct matching function signature to WriteCompleteTuple
llvm::PointerType* uint8_ptr_type = codegen->i8_ptr_type();
llvm::PointerType* field_loc_ptr_type = codegen->GetStructPtrType<FieldLocation>();
llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>();
llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
llvm::PointerType* mem_pool_ptr_type = codegen->GetStructPtrType<MemPool>();
llvm::PointerType* hdfs_scanner_ptr_type = codegen->GetStructPtrType<HdfsScanner>();
// Generate the typed llvm struct for the output tuple
llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
if (tuple_type == NULL) return Status("Could not generate tuple struct.");
llvm::PointerType* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
// Initialize the function prototype. This needs to match
// HdfsScanner::WriteCompleteTuple's signature identically.
LlvmCodeGen::FnPrototype prototype(
codegen, "WriteCompleteTuple", codegen->bool_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", hdfs_scanner_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mem_pool_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("fields", field_loc_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("template", tuple_opaque_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("error_fields", uint8_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("error_in_row", uint8_ptr_type));
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
llvm::Value* args[8];
llvm::Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
llvm::BasicBlock* parse_block = llvm::BasicBlock::Create(context, "parse", fn);
llvm::BasicBlock* eval_fail_block = llvm::BasicBlock::Create(context, "eval_fail", fn);
// Extract the input args
llvm::Value* this_arg = args[0];
llvm::Value* mem_pool_arg = args[1];
llvm::Value* fields_arg = args[2];
llvm::Value* opaque_tuple_arg = args[3];
llvm::Value* tuple_arg =
builder.CreateBitCast(opaque_tuple_arg, tuple_ptr_type, "tuple_ptr");
llvm::Value* tuple_row_arg = args[4];
llvm::Value* opaque_template_arg = args[5];
llvm::Value* errors_arg = args[6];
llvm::Value* error_in_row_arg = args[7];
// Codegen for function body
llvm::Value* error_in_row = codegen->false_value();
llvm::Function* init_tuple_fn;
RETURN_IF_ERROR(CodegenInitTuple(node, codegen, &init_tuple_fn));
builder.CreateCall(init_tuple_fn, {this_arg, opaque_template_arg, opaque_tuple_arg});
// Put tuple in tuple_row
llvm::Value* tuple_row_typed =
builder.CreateBitCast(tuple_row_arg, llvm::PointerType::get(tuple_ptr_type, 0));
llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(0)};
llvm::Value* tuple_in_row_addr =
builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs);
builder.CreateStore(tuple_arg, tuple_in_row_addr);
builder.CreateBr(parse_block);
// Loop through all the conjuncts in order and materialize slots as necessary to
// evaluate the conjuncts (e.g. conjuncts[0] will have the slots it references
// first).
// materialized_order[slot_idx] represents the first conjunct which needs that slot.
// Slots are only materialized if its order matches the current conjunct being
// processed. This guarantees that each slot is materialized once when it is first
// needed and that at the end of the materialize loop, the conjunct has everything
// it needs (either from this iteration or previous iterations).
builder.SetInsertPoint(parse_block);
for (int conjunct_idx = 0; conjunct_idx <= conjuncts.size(); ++conjunct_idx) {
for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) {
// If they don't match, it means either the slot has already been
// materialized for a previous conjunct or will be materialized later for
// another conjunct. Either case, the slot does not need to be materialized
// yet.
if (materialize_order[slot_idx] != conjunct_idx) continue;
// Materialize slots[slot_idx] to evaluate conjuncts[conjunct_idx]
// All slots[i] with materialized_order[i] < conjunct_idx have already been
// materialized by prior iterations through the outer loop
// Extract ptr/len from fields
llvm::Value* data_idxs[] = {
codegen->GetI32Constant(slot_idx),
codegen->GetI32Constant(0),
};
llvm::Value* len_idxs[] = {
codegen->GetI32Constant(slot_idx),
codegen->GetI32Constant(1),
};
llvm::Value* error_idxs[] = {
codegen->GetI32Constant(slot_idx),
};
llvm::Value* data_ptr =
builder.CreateInBoundsGEP(fields_arg, data_idxs, "data_ptr");
llvm::Value* len_ptr = builder.CreateInBoundsGEP(fields_arg, len_idxs, "len_ptr");
llvm::Value* error_ptr =
builder.CreateInBoundsGEP(errors_arg, error_idxs, "slot_error_ptr");
llvm::Value* data = builder.CreateLoad(data_ptr, "data");
llvm::Value* len = builder.CreateLoad(len_ptr, "len");
// Convert length to positive if it is negative. Negative lengths are assigned to
// slots that contain escape characters.
// TODO: CodegenWriteSlot() currently does not handle text that requres unescaping.
// However, if it is modified to handle that case, we need to detect it here and
// send a 'need_escape' bool to CodegenWriteSlot(), since we are making the length
// positive here.
llvm::Value* len_lt_zero =
builder.CreateICmpSLT(len, codegen->GetI32Constant(0), "len_lt_zero");
llvm::Value* ones_compliment_len = builder.CreateNot(len, "ones_compliment_len");
llvm::Value* positive_len = builder.CreateAdd(
ones_compliment_len, codegen->GetI32Constant(1), "positive_len");
len = builder.CreateSelect(len_lt_zero, positive_len, len,
"select_positive_len");
// Call slot parse function
llvm::Function* slot_fn = slot_fns[slot_idx];
llvm::Value* slot_parsed = nullptr;
if (LIKELY(slot_fn != nullptr)) {
slot_parsed = builder.CreateCall(
slot_fn, llvm::ArrayRef<llvm::Value*>({tuple_arg, data, len}));
} else {
llvm::Value* slot_idx_value = codegen->GetI32Constant(slot_idx);
llvm::Function* interpreted_slot_fn = codegen->GetFunction(
IRFunction::HDFS_SCANNER_TEXT_CONVERTER_WRITE_SLOT_INTERPRETED_IR, false);
DCHECK(interpreted_slot_fn != nullptr);
slot_parsed = builder.CreateCall(interpreted_slot_fn,
{this_arg, slot_idx_value, opaque_tuple_arg, data, len, mem_pool_arg});
}
llvm::Value* slot_error = builder.CreateNot(slot_parsed, "slot_parse_error");
error_in_row = builder.CreateOr(error_in_row, slot_error, "error_in_row");
slot_error = builder.CreateZExt(slot_error, codegen->i8_type());
builder.CreateStore(slot_error, error_ptr);
}
if (conjunct_idx == conjuncts.size()) {
// In this branch, we've just materialized slots not referenced by any conjunct.
// This slots are the last to get materialized. If we are in this branch, the
// tuple passed all conjuncts and should be added to the row batch.
llvm::Value* error_ret =
builder.CreateZExt(error_in_row, codegen->i8_type());
builder.CreateStore(error_ret, error_in_row_arg);
builder.CreateRet(codegen->true_value());
} else {
// All slots for conjuncts[conjunct_idx] are materialized, evaluate the partial
// tuple against that conjunct and start a new parse_block for the next conjunct
parse_block = llvm::BasicBlock::Create(context, "parse", fn, eval_fail_block);
llvm::Function* conjunct_fn;
RETURN_IF_ERROR(
conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn));
if (node->materialized_slots_.size() + conjunct_idx
>= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
codegen->SetNoInline(conjunct_fn);
}
llvm::Function* get_eval_fn =
codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_EVALUATOR, false);
llvm::Value* eval = builder.CreateCall(
get_eval_fn, llvm::ArrayRef<llvm::Value*>(
{this_arg, codegen->GetI32Constant(conjunct_idx)}));
llvm::Value* conjunct_args[] = {eval, tuple_row_arg};
CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
codegen, &builder, ColumnType(TYPE_BOOLEAN), conjunct_fn, conjunct_args,
"conjunct_eval");
builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block);
builder.SetInsertPoint(parse_block);
}
}
// Block if eval failed.
builder.SetInsertPoint(eval_fail_block);
builder.CreateRet(codegen->false_value());
if (node->materialized_slots_.size() + conjuncts.size()
> LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
codegen->SetNoInline(fn);
}
*write_complete_tuple_fn = codegen->FinalizeFunction(fn);
if (*write_complete_tuple_fn == NULL) {
return Status("Failed to finalize write_complete_tuple_fn.");
}
return Status::OK();
}