public ThriftClientPool()

in amoro-common/src/main/java/org/apache/amoro/client/ThriftClientPool.java [48:148]


  public ThriftClientPool(
      String url,
      ThriftClientFactory factory,
      ThriftPingFactory pingFactory,
      PoolConfig<T> config,
      String serviceName) {
    if (url == null || url.isEmpty()) {
      throw new IllegalArgumentException("url is empty!");
    }
    if (factory == null) {
      throw new IllegalArgumentException("factory is empty!");
    }
    if (config == null) {
      throw new IllegalArgumentException("config is empty!");
    }

    this.clientFactory = factory;
    this.pingFactory = pingFactory;
    this.poolConfig = config;
    // test if config change
    this.poolConfig.setTestOnReturn(true);
    this.poolConfig.setTestOnBorrow(true);
    this.pool =
        new GenericObjectPool<>(
            new BasePooledObjectFactory<ThriftClient<T>>() {

              @Override
              public ThriftClient<T> create() throws Exception {

                // get from global list first
                AmsThriftUrl amsThriftUrl = AmsThriftUrl.parse(url, serviceName);
                ServiceInfo serviceInfo = new ServiceInfo(amsThriftUrl.host(), amsThriftUrl.port());
                TTransport transport = getTransport(serviceInfo);

                try {
                  transport.open();
                } catch (TTransportException e) {
                  LOG.warn(
                      "Transport open failed, service address: {}:{}",
                      serviceInfo.getHost(),
                      serviceInfo.getPort());
                  if (poolConfig.isAutoReconnect() && poolConfig.getMaxReconnects() > 0) {
                    for (int i = 0; i < poolConfig.getMaxReconnects(); i++) {
                      try {
                        amsThriftUrl = AmsThriftUrl.parse(url, serviceName);
                        serviceInfo.setHost(amsThriftUrl.host());
                        serviceInfo.setPort(amsThriftUrl.port());
                        transport = getTransport(serviceInfo); // while break here
                        LOG.info(
                            "Reconnecting service address: {}:{}",
                            serviceInfo.getHost(),
                            serviceInfo.getPort());
                        transport.open();
                        break;
                      } catch (TTransportException e2) {
                        LOG.warn(
                            "Reconnected service address: {}:{} failed",
                            serviceInfo.getHost(),
                            serviceInfo.getPort(),
                            e2);
                      }
                      TimeUnit.MILLISECONDS.sleep(RECONNECT_INTERVAL);
                    }
                    if (!transport.isOpen()) {
                      throw new ConnectionFailException(
                          String.format(
                              "Connected error after %d retries, last address: %s:%d",
                              poolConfig.getMaxReconnects(),
                              serviceInfo.getHost(),
                              serviceInfo.getPort()),
                          e);
                    }
                  } else {
                    throw new ConnectionFailException(
                        String.format(
                            "Connected error, address: %s:%d",
                            serviceInfo.getHost(), serviceInfo.getPort()),
                        e);
                  }
                }

                ThriftClient<T> client =
                    new ThriftClient<>(clientFactory.createClient(transport), pool, serviceInfo);

                LOG.debug("created new thrift pool for url:{}", url);
                return client;
              }

              @Override
              public PooledObject<ThriftClient<T>> wrap(ThriftClient<T> obj) {
                return new DefaultPooledObject<>(obj);
              }

              @Override
              public void destroyObject(PooledObject<ThriftClient<T>> p) throws Exception {
                p.getObject().closeClient();
                super.destroyObject(p);
              }
            },
            poolConfig);
  }