in be/src/vec/exec/scan/file_scanner.cpp [863:1218]
Status FileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
_cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
_state->update_num_finished_scan_range(1);
}
_cur_reader.reset(nullptr);
_src_block_init = false;
bool has_next = _first_scan_range;
if (!_first_scan_range) {
RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
}
_first_scan_range = false;
if (!has_next || _should_stop) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;
if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());
if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}
bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}
// create reader for specific format
Status init_status;
// for compatibility, if format_type is not set in range, use the format type of params
TFileFormatType::type format_type =
range.__isset.format_type ? range.format_type : _params->format_type;
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
// for compatibility, this logic is deprecated in 3.1
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (!mc_desc->init_status()) {
return mc_desc->init_status();
}
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
_cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
range, _params);
init_status = ((PaimonJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader = HudiJniReader::create_unique(*_params,
range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "lakesoul") {
_cur_reader =
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
_file_slot_descs, _state, _profile);
init_status = ((LakeSoulJniReader*)_cur_reader.get())
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "trino_connector") {
_cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
_profile, range);
init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
}
break;
}
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state,
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
}
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _io_ctx.get());
init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiParquetReader> hudi_reader =
HudiParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _io_ctx.get());
init_status = hudi_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hudi_reader);
} else {
bool hive_parquet_use_column_names = true;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_parquet_use_column_names =
_state->query_options().hive_parquet_use_column_names;
}
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names);
_cur_reader = std::move(parquet_reader);
}
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "transactional_hive") {
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
_state, *_params, range,
_io_ctx.get());
init_status = tran_orc_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state,
*_params, range, _kv_cache, _io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get());
init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiOrcReader> hudi_reader = HudiOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get());
init_status = hudi_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hudi_reader);
} else {
bool hive_orc_use_column_names = true;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names;
}
init_status = orc_reader->init_reader(
&_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
hive_orc_use_column_names);
_cur_reader = std::move(orc_reader);
}
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
case TFileFormatType::FORMAT_PROTO: {
_cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader =
NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, &_scanner_eof, _io_ctx.get());
init_status = ((NewJsonReader*)(_cur_reader.get()))
->init_reader(_col_default_value_ctx, _is_load);
break;
}
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
range);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader.reset(new WalReader(_state));
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
case TFileFormatType::FORMAT_ARROW: {
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
range, _file_slot_descs, _io_ctx.get());
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params->format_type);
}
if (_cur_reader == nullptr) {
return Status::InternalError("Failed to create reader for file format: {}",
_params->format_type);
}
COUNTER_UPDATE(_file_counter, 1);
// The FileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for FileScanner is not a fail case.
// Will remove this after file reader refactor.
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (init_status.is<ErrorCode::NOT_FOUND>()) {
if (config::ignore_not_found_file_in_external_table) {
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
}
return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
} else if (!init_status.ok()) {
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
}
_name_to_col_type.clear();
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
fmt::format_to(col_buf, " {}", col);
}
VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
range.path);
}
_source_file_col_names.clear();
_source_file_col_types.clear();
_source_file_col_name_types.clear();
if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
&_source_file_col_types);
if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
return status;
}
DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
for (int i = 0; i < _source_file_col_names.size(); ++i) {
_source_file_col_name_types[_source_file_col_names[i]] = _source_file_col_types[i];
}
}
_cur_reader_eof = false;
break;
}
return Status::OK();
}