include/hbase/client/async-connection.h (84 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #pragma once #include <folly/futures/Future.h> #include <folly/io/IOBuf.h> #include <wangle/concurrent/CPUThreadPoolExecutor.h> #include <wangle/concurrent/IOThreadPoolExecutor.h> #include <memory> #include <string> #include <utility> #include "hbase/connection/rpc-client.h" #include "hbase/client/async-region-locator.h" #include "hbase/client/configuration.h" #include "hbase/client/connection-configuration.h" #include "hbase/client/hbase-configuration-loader.h" #include "hbase/client/hbase-rpc-controller.h" #include "hbase/client/keyvalue-codec.h" #include "hbase/client/location-cache.h" #include "Cell.pb.h" #include "hbase/serde/table-name.h" namespace hbase { class AsyncRpcRetryingCallerFactory; class AsyncConnection { public: AsyncConnection() {} virtual ~AsyncConnection() {} virtual std::shared_ptr<Configuration> conf() = 0; virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0; virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0; virtual std::shared_ptr<RpcClient> rpc_client() = 0; virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0; virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0; virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0; virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0; virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0; virtual void Close() = 0; }; class AsyncConnectionImpl : public AsyncConnection, public std::enable_shared_from_this<AsyncConnectionImpl> { public: virtual ~AsyncConnectionImpl(); // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/ template <typename... T> static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) { auto conn = std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...)); conn->Init(); return conn; } std::shared_ptr<Configuration> conf() override { return conf_; } std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; } std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { return caller_factory_; } std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } std::shared_ptr<LocationCache> location_cache() { return location_cache_; } std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; } std::shared_ptr<HBaseRpcController> CreateRpcController() override { return std::make_shared<HBaseRpcController>(); } std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { return retry_executor_; } void Close() override; protected: AsyncConnectionImpl() {} private: /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; /** The RPC codec to encode cells. For now it is KeyValueCodec */ static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; std::shared_ptr<Configuration> conf_; std::shared_ptr<ConnectionConfiguration> connection_conf_; std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; std::shared_ptr<folly::HHWheelTimer> retry_timer_; std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; std::shared_ptr<LocationCache> location_cache_; std::shared_ptr<RpcClient> rpc_client_; bool is_closed_ = false; private: explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {} void Init(); }; } // namespace hbase