include/hbase/client/async-client-scanner.h (86 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #pragma once #include <folly/Format.h> #include <folly/Logging.h> #include <folly/futures/Future.h> #include <folly/io/async/EventBase.h> #include <folly/io/async/HHWheelTimer.h> #include <algorithm> #include <chrono> #include <functional> #include <memory> #include <string> #include <type_traits> #include <utility> #include <vector> #include "hbase/connection/rpc-client.h" #include "hbase/client/async-connection.h" #include "hbase/client/async-rpc-retrying-caller-factory.h" #include "hbase/client/async-rpc-retrying-caller.h" #include "hbase/client/hbase-rpc-controller.h" #include "hbase/client/raw-scan-result-consumer.h" #include "hbase/client/region-location.h" #include "hbase/client/request-converter.h" #include "hbase/client/response-converter.h" #include "hbase/client/result.h" #include "hbase/client/scan-result-cache.h" #include "hbase/client/scan.h" #include "hbase/exceptions/exception.h" #include "client/Client.pb.h" #include "HBase.pb.h" #include "hbase/utils/connection-util.h" #include "hbase/utils/sys-util.h" #include "hbase/utils/time-util.h" using std::chrono::nanoseconds; using std::chrono::milliseconds; namespace hbase { class OpenScannerResponse { public: OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client, const std::unique_ptr<Response>& resp, std::shared_ptr<RegionLocation> region_location, std::shared_ptr<hbase::HBaseRpcController> controller) : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) { scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg()); cell_scanner_ = resp->cell_scanner(); } std::shared_ptr<hbase::RpcClient> rpc_client_; std::shared_ptr<pb::ScanResponse> scan_resp_; std::shared_ptr<RegionLocation> region_location_; std::shared_ptr<hbase::HBaseRpcController> controller_; std::shared_ptr<CellScanner> cell_scanner_; }; class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> { public: template <typename... T> static std::shared_ptr<AsyncClientScanner> Create(T&&... all) { return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...)); } void Start(); private: // methods AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan, std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos, nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner( std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<hbase::HBaseRpcController> controller, std::shared_ptr<hbase::RegionLocation> loc); void OpenScanner(); void StartScan(std::shared_ptr<OpenScannerResponse> resp); RegionLocateType GetLocateType(const Scan& scan); private: // data std::shared_ptr<AsyncConnection> conn_; std::shared_ptr<Scan> scan_; std::shared_ptr<pb::TableName> table_name_; std::shared_ptr<ScanResultCache> results_cache_; std::shared_ptr<RawScanResultConsumer> consumer_; nanoseconds pause_; uint32_t max_retries_; nanoseconds scan_timeout_nanos_; nanoseconds rpc_timeout_nanos_; uint32_t start_log_errors_count_; uint32_t max_attempts_; uint32_t open_scanner_tries_ = 0; }; } // namespace hbase