in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java [90:117]
public Collection<RowData> lookup(RowData keyRow) throws IOException {
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// TODO: The implementation of LookupFunction will pass a GenericRowData as key row
// and it's safe to cast for now. We need to update the logic once we improve the
// LookupFunction in the future.
Get get = serde.createGet(((GenericRowData) keyRow).getField(0));
if (get != null) {
Result result = table.get(get);
if (!result.isEmpty()) {
return Collections.singletonList(serde.convertToReusedRow(result));
}
}
break;
} catch (IOException e) {
LOG.error(String.format("HBase lookup error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of HBase lookup failed.", e);
}
try {
Thread.sleep(1000 * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}