in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java [94:113]
public void open(FunctionContext context) {
LOG.info("start open ...");
final ExecutorService threadPool =
Executors.newFixedThreadPool(
THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
Configuration config = prepareRuntimeConfiguration();
CompletableFuture<AsyncConnection> asyncConnectionFuture =
ConnectionFactory.createAsyncConnection(config);
try {
asyncConnection = asyncConnectionFuture.get();
table = asyncConnection.getTable(TableName.valueOf(hTableName), threadPool);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception while creating connection to HBase.", e);
throw new RuntimeException("Cannot create connection to HBase.", e);
}
this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
LOG.info("end open.");
}