in adb3client/src/main/java/com/alibaba/cloud/analyticdb/adb3client/impl/ExecutionPool.java [112:189]
public ExecutionPool(String name, AdbConfig config, boolean isFixedPool) {
this.name = name;
this.config = config;
this.isFixedPool = isFixedPool;
workerThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(ExecutionPool.this.name + "-worker");
t.setDaemon(false);
return t;
}
};
backgroundThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(ExecutionPool.this.name + "-background");
t.setDaemon(false);
return t;
}
};
ontShotWorkerThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(ExecutionPool.this.name + "-oneshot-worker");
t.setDaemon(false);
return t;
}
};
this.writeThreadSize = config.getWriteThreadSize();
this.enableShutdownHook = config.isEnableShutdownHook();
// Fe模式: workerSize取读并发和写并发的最大值,worker会公用
// FixedFe模式: 分为fixedPool(workerSize取读并发和写并发的最大值,worker会公用)和fePool(workerSize设置为connectionSizeWhenUseFixedFe, 用于sql、meta等其他action)
int workerSize;
if (config.isUseFixedFe() && !isFixedPool) {
workerSize = config.getConnectionSizeWhenUseFixedFe();
} else {
workerSize = writeThreadSize;
}
workers = new Worker[workerSize];
started = new AtomicBoolean(false);
workerStated = new AtomicBoolean(false);
dataSource = new DruidDataSource();
// 配置连接池相关参数
dataSource.setUrl(addRewriteBatchedStatements(config.getJdbcUrl()));
dataSource.setUsername(config.getUsername());
dataSource.setPassword(config.getPassword());
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setInitialSize(workerSize);
dataSource.setMinIdle(workerSize);
dataSource.setMaxActive(workerSize * 2);
dataSource.setMaxWait(60000);
dataSource.setMinEvictableIdleTimeMillis(600000);
dataSource.setMaxEvictableIdleTimeMillis(900000);
dataSource.setTimeBetweenEvictionRunsMillis(2000);
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
dataSource.setKeepAlive(true);
dataSource.setKeepAliveBetweenTimeMillis(30000);
dataSource.setPhyMaxUseCount(1000);
dataSource.setValidationQuery("select 1");
for (int i = 0; i < workerSize; ++i) {
if (isFixedPool) {
workers[i] = new Worker(config, dataSource, workerStated, i, true);
} else {
workers[i] = new Worker(config, dataSource, workerStated, i);
}
}
clientMap = new ConcurrentHashMap<>();
byteSizeCache = new ByteSizeCache(config.getWriteBatchTotalByteSize());
backgroundJob = new BackgroundJob(config);
this.tableCache = new Cache<>(config.getMetaCacheTTL(), null);
}