in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java [136:175]
private void fetchResult(
CompletableFuture<Collection<RowData>> resultFuture, int currentRetry, Object rowKey) {
Get get = serde.createGet(rowKey);
CompletableFuture<Result> responseFuture = table.get(get);
responseFuture.whenCompleteAsync(
(result, throwable) -> {
if (throwable != null) {
if (throwable instanceof TableNotFoundException) {
LOG.error("Table '{}' not found ", hTableName, throwable);
resultFuture.completeExceptionally(
new RuntimeException(
"HBase table '" + hTableName + "' not found.",
throwable));
} else {
LOG.error(
String.format(
"HBase asyncLookup error, retry times = %d",
currentRetry),
throwable);
if (currentRetry >= maxRetryTimes) {
resultFuture.completeExceptionally(throwable);
} else {
try {
Thread.sleep(1000 * currentRetry);
} catch (InterruptedException e1) {
resultFuture.completeExceptionally(e1);
}
fetchResult(resultFuture, currentRetry + 1, rowKey);
}
}
} else {
if (result.isEmpty()) {
resultFuture.complete(Collections.emptyList());
} else {
resultFuture.complete(
Collections.singletonList(serde.convertToNewRow(result)));
}
}
});
}