in amoro-common/src/main/java/org/apache/amoro/client/ThriftClientPool.java [48:148]
public ThriftClientPool(
String url,
ThriftClientFactory factory,
ThriftPingFactory pingFactory,
PoolConfig<T> config,
String serviceName) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("url is empty!");
}
if (factory == null) {
throw new IllegalArgumentException("factory is empty!");
}
if (config == null) {
throw new IllegalArgumentException("config is empty!");
}
this.clientFactory = factory;
this.pingFactory = pingFactory;
this.poolConfig = config;
// test if config change
this.poolConfig.setTestOnReturn(true);
this.poolConfig.setTestOnBorrow(true);
this.pool =
new GenericObjectPool<>(
new BasePooledObjectFactory<ThriftClient<T>>() {
@Override
public ThriftClient<T> create() throws Exception {
// get from global list first
AmsThriftUrl amsThriftUrl = AmsThriftUrl.parse(url, serviceName);
ServiceInfo serviceInfo = new ServiceInfo(amsThriftUrl.host(), amsThriftUrl.port());
TTransport transport = getTransport(serviceInfo);
try {
transport.open();
} catch (TTransportException e) {
LOG.warn(
"Transport open failed, service address: {}:{}",
serviceInfo.getHost(),
serviceInfo.getPort());
if (poolConfig.isAutoReconnect() && poolConfig.getMaxReconnects() > 0) {
for (int i = 0; i < poolConfig.getMaxReconnects(); i++) {
try {
amsThriftUrl = AmsThriftUrl.parse(url, serviceName);
serviceInfo.setHost(amsThriftUrl.host());
serviceInfo.setPort(amsThriftUrl.port());
transport = getTransport(serviceInfo); // while break here
LOG.info(
"Reconnecting service address: {}:{}",
serviceInfo.getHost(),
serviceInfo.getPort());
transport.open();
break;
} catch (TTransportException e2) {
LOG.warn(
"Reconnected service address: {}:{} failed",
serviceInfo.getHost(),
serviceInfo.getPort(),
e2);
}
TimeUnit.MILLISECONDS.sleep(RECONNECT_INTERVAL);
}
if (!transport.isOpen()) {
throw new ConnectionFailException(
String.format(
"Connected error after %d retries, last address: %s:%d",
poolConfig.getMaxReconnects(),
serviceInfo.getHost(),
serviceInfo.getPort()),
e);
}
} else {
throw new ConnectionFailException(
String.format(
"Connected error, address: %s:%d",
serviceInfo.getHost(), serviceInfo.getPort()),
e);
}
}
ThriftClient<T> client =
new ThriftClient<>(clientFactory.createClient(transport), pool, serviceInfo);
LOG.debug("created new thrift pool for url:{}", url);
return client;
}
@Override
public PooledObject<ThriftClient<T>> wrap(ThriftClient<T> obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public void destroyObject(PooledObject<ThriftClient<T>> p) throws Exception {
p.getObject().closeClient();
super.destroyObject(p);
}
},
poolConfig);
}