in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java [148:173]
public T nextRecord(T reuse) throws IOException {
if (resultScanner == null) {
throw new IOException("No table result scanner provided!");
}
Result res;
try {
res = resultScanner.next();
} catch (Exception e) {
resultScanner.close();
// workaround for timeout on scan
LOG.warn(
"Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
scan.withStartRow(currentRow, false);
resultScanner = table.getScanner(scan);
res = resultScanner.next();
}
if (res != null) {
scannedRows++;
currentRow = res.getRow();
return mapResultToOutType(res);
}
endReached = true;
return null;
}