Status FileScanner::_get_next_reader()

in be/src/vec/exec/scan/file_scanner.cpp [863:1218]


Status FileScanner::_get_next_reader() {
    while (true) {
        if (_cur_reader) {
            _cur_reader->collect_profile_before_close();
            RETURN_IF_ERROR(_cur_reader->close());
            _state->update_num_finished_scan_range(1);
        }
        _cur_reader.reset(nullptr);
        _src_block_init = false;
        bool has_next = _first_scan_range;
        if (!_first_scan_range) {
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
        }
        _first_scan_range = false;
        if (!has_next || _should_stop) {
            _scanner_eof = true;
            return Status::OK();
        }

        const TFileRangeDesc& range = _current_range;
        _current_range_path = range.path;

        if (!_partition_slot_descs.empty()) {
            // we need get partition columns first for runtime filter partition pruning
            RETURN_IF_ERROR(_generate_parititon_columns());

            if (_state->query_options().enable_runtime_filter_partition_prune) {
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
                // by runtime filter partition prune
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
                    _init_runtime_filter_partition_prune_ctxs();
                }

                bool can_filter_all = false;
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
                if (can_filter_all) {
                    // this range can be filtered out by runtime filter partition pruning
                    // so we need to skip this range
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
                    continue;
                }
            }
        }

        // create reader for specific format
        Status init_status;
        // for compatibility, if format_type is not set in range, use the format type of params
        TFileFormatType::type format_type =
                range.__isset.format_type ? range.format_type : _params->format_type;
        // JNI reader can only push down column value range
        bool push_down_predicates =
                !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
        // for compatibility, this logic is deprecated in 3.1
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
            if (range.table_format_params.table_format_type == "paimon" &&
                !range.table_format_params.paimon_params.__isset.paimon_split) {
                // use native reader
                auto format = range.table_format_params.paimon_params.file_format;
                if (format == "orc") {
                    format_type = TFileFormatType::FORMAT_ORC;
                } else if (format == "parquet") {
                    format_type = TFileFormatType::FORMAT_PARQUET;
                } else {
                    return Status::InternalError("Not supported paimon file format: {}", format);
                }
            }
        }
        bool need_to_get_parsed_schema = false;
        switch (format_type) {
        case TFileFormatType::FORMAT_JNI: {
            if (range.__isset.table_format_params &&
                range.table_format_params.table_format_type == "max_compute") {
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
                        _real_tuple_desc->table_desc());
                if (!mc_desc->init_status()) {
                    return mc_desc->init_status();
                }
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
                        range, _state, _profile);
                init_status = mc_reader->init_reader(_colname_to_value_range);
                _cur_reader = std::move(mc_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "paimon") {
                _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
                                                             range, _params);
                init_status = ((PaimonJniReader*)(_cur_reader.get()))
                                      ->init_reader(_colname_to_value_range);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "hudi") {
                _cur_reader = HudiJniReader::create_unique(*_params,
                                                           range.table_format_params.hudi_params,
                                                           _file_slot_descs, _state, _profile);
                init_status =
                        ((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "lakesoul") {
                _cur_reader =
                        LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
                                                         _file_slot_descs, _state, _profile);
                init_status = ((LakeSoulJniReader*)_cur_reader.get())
                                      ->init_reader(_colname_to_value_range);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "trino_connector") {
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
                                                                     _profile, range);
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))
                                      ->init_reader(_colname_to_value_range);
            }
            break;
        }
        case TFileFormatType::FORMAT_PARQUET: {
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
                    _profile, *_params, range, _state->query_options().batch_size,
                    const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state,
                    _should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
                                                     : nullptr,
                    _state->query_options().enable_parquet_lazy_mat);
            // ATTN: the push down agg type may be set back to NONE,
            // see IcebergTableReader::init_row_filters for example.
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
            {
                SCOPED_TIMER(_open_reader_timer);
                RETURN_IF_ERROR(parquet_reader->open());
            }
            if (push_down_predicates) {
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
            }
            if (range.__isset.table_format_params &&
                range.table_format_params.table_format_type == "iceberg") {
                std::unique_ptr<IcebergParquetReader> iceberg_reader =
                        IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
                                                            _state, *_params, range, _kv_cache,
                                                            _io_ctx.get());
                init_status = iceberg_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
                        &_slot_id_to_filter_conjuncts);
                _cur_reader = std::move(iceberg_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "paimon") {
                std::unique_ptr<PaimonParquetReader> paimon_reader =
                        PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
                                                           _state, *_params, range, _io_ctx.get());
                init_status = paimon_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
                        &_slot_id_to_filter_conjuncts);
                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                _cur_reader = std::move(paimon_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "hudi") {
                std::unique_ptr<HudiParquetReader> hudi_reader =
                        HudiParquetReader::create_unique(std::move(parquet_reader), _profile,
                                                         _state, *_params, range, _io_ctx.get());
                init_status = hudi_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
                        &_slot_id_to_filter_conjuncts);
                _cur_reader = std::move(hudi_reader);
            } else {
                bool hive_parquet_use_column_names = true;

                if (range.__isset.table_format_params &&
                    range.table_format_params.table_format_type == "hive" && _state != nullptr)
                        [[likely]] {
                    hive_parquet_use_column_names =
                            _state->query_options().hive_parquet_use_column_names;
                }

                std::vector<std::string> place_holder;
                init_status = parquet_reader->init_reader(
                        _file_col_names, place_holder, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
                        &_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names);
                _cur_reader = std::move(parquet_reader);
            }
            need_to_get_parsed_schema = true;
            break;
        }
        case TFileFormatType::FORMAT_ORC: {
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
                    _profile, _state, *_params, range, _state->query_options().batch_size,
                    _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
            if (push_down_predicates) {
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
            }
            if (range.__isset.table_format_params &&
                range.table_format_params.table_format_type == "transactional_hive") {
                std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
                        TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
                                                               _state, *_params, range,
                                                               _io_ctx.get());
                init_status = tran_orc_reader->init_reader(
                        _file_col_names, _colname_to_value_range, _push_down_conjuncts,
                        _real_tuple_desc, _default_val_row_desc.get(),
                        &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
                RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
                _cur_reader = std::move(tran_orc_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "iceberg") {
                std::unique_ptr<IcebergOrcReader> iceberg_reader =
                        IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state,
                                                        *_params, range, _kv_cache, _io_ctx.get());

                init_status = iceberg_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
                        &_slot_id_to_filter_conjuncts);
                _cur_reader = std::move(iceberg_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "paimon") {
                std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
                        std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get());

                init_status = paimon_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                _cur_reader = std::move(paimon_reader);
            } else if (range.__isset.table_format_params &&
                       range.table_format_params.table_format_type == "hudi") {
                std::unique_ptr<HudiOrcReader> hudi_reader = HudiOrcReader::create_unique(
                        std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get());

                init_status = hudi_reader->init_reader(
                        _file_col_names, _col_id_name_map, _colname_to_value_range,
                        _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
                        &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
                _cur_reader = std::move(hudi_reader);
            } else {
                bool hive_orc_use_column_names = true;

                if (range.__isset.table_format_params &&
                    range.table_format_params.table_format_type == "hive" && _state != nullptr)
                        [[likely]] {
                    hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names;
                }
                init_status = orc_reader->init_reader(
                        &_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false,
                        _real_tuple_desc, _default_val_row_desc.get(),
                        &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
                        hive_orc_use_column_names);
                _cur_reader = std::move(orc_reader);
            }
            need_to_get_parsed_schema = true;
            break;
        }
        case TFileFormatType::FORMAT_CSV_PLAIN:
        case TFileFormatType::FORMAT_CSV_GZ:
        case TFileFormatType::FORMAT_CSV_BZ2:
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
        case TFileFormatType::FORMAT_CSV_LZOP:
        case TFileFormatType::FORMAT_CSV_DEFLATE:
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
        case TFileFormatType::FORMAT_PROTO: {
            _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
                                                   _file_slot_descs, _io_ctx.get());
            init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
            break;
        }
        case TFileFormatType::FORMAT_JSON: {
            _cur_reader =
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
            init_status = ((NewJsonReader*)(_cur_reader.get()))
                                  ->init_reader(_col_default_value_ctx, _is_load);
            break;
        }
        case TFileFormatType::FORMAT_AVRO: {
            _cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
                                                       range);
            init_status = ((AvroJNIReader*)(_cur_reader.get()))
                                  ->init_fetch_table_reader(_colname_to_value_range);
            break;
        }
        case TFileFormatType::FORMAT_WAL: {
            _cur_reader.reset(new WalReader(_state));
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
            break;
        }
        case TFileFormatType::FORMAT_ARROW: {
            _cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
                                                           range, _file_slot_descs, _io_ctx.get());
            init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
            break;
        }
        default:
            return Status::InternalError("Not supported file format: {}", _params->format_type);
        }

        if (_cur_reader == nullptr) {
            return Status::InternalError("Failed to create reader for  file format: {}",
                                         _params->format_type);
        }
        COUNTER_UPDATE(_file_counter, 1);
        // The FileScanner for external table may try to open not exist files,
        // Because FE file cache for external table may out of date.
        // So, NOT_FOUND for FileScanner is not a fail case.
        // Will remove this after file reader refactor.
        if (init_status.is<END_OF_FILE>()) {
            COUNTER_UPDATE(_empty_file_counter, 1);
            continue;
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
            if (config::ignore_not_found_file_in_external_table) {
                COUNTER_UPDATE(_not_found_file_counter, 1);
                continue;
            }
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
        } else if (!init_status.ok()) {
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
        }

        _name_to_col_type.clear();
        _missing_cols.clear();
        RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
        RETURN_IF_ERROR(_generate_missing_columns());
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
        if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
            fmt::memory_buffer col_buf;
            for (auto& col : _missing_cols) {
                fmt::format_to(col_buf, " {}", col);
            }
            VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
                                       range.path);
        }

        _source_file_col_names.clear();
        _source_file_col_types.clear();
        _source_file_col_name_types.clear();
        if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
            Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
                                                           &_source_file_col_types);
            if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
                return status;
            }
            DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
            for (int i = 0; i < _source_file_col_names.size(); ++i) {
                _source_file_col_name_types[_source_file_col_names[i]] = _source_file_col_types[i];
            }
        }
        _cur_reader_eof = false;
        break;
    }
    return Status::OK();
}