private void fetchResult()

in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java [136:175]


    private void fetchResult(
            CompletableFuture<Collection<RowData>> resultFuture, int currentRetry, Object rowKey) {
        Get get = serde.createGet(rowKey);
        CompletableFuture<Result> responseFuture = table.get(get);
        responseFuture.whenCompleteAsync(
                (result, throwable) -> {
                    if (throwable != null) {
                        if (throwable instanceof TableNotFoundException) {
                            LOG.error("Table '{}' not found ", hTableName, throwable);
                            resultFuture.completeExceptionally(
                                    new RuntimeException(
                                            "HBase table '" + hTableName + "' not found.",
                                            throwable));
                        } else {
                            LOG.error(
                                    String.format(
                                            "HBase asyncLookup error, retry times = %d",
                                            currentRetry),
                                    throwable);
                            if (currentRetry >= maxRetryTimes) {
                                resultFuture.completeExceptionally(throwable);
                            } else {
                                try {
                                    Thread.sleep(1000 * currentRetry);
                                } catch (InterruptedException e1) {
                                    resultFuture.completeExceptionally(e1);
                                }
                                fetchResult(resultFuture, currentRetry + 1, rowKey);
                            }
                        }
                    } else {
                        if (result.isEmpty()) {
                            resultFuture.complete(Collections.emptyList());
                        } else {
                            resultFuture.complete(
                                    Collections.singletonList(serde.convertToNewRow(result)));
                        }
                    }
                });
    }