flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/AbstractTableInputFormat.java [143:195]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public T nextRecord(T reuse) throws IOException {
        if (resultScanner == null) {
            throw new IOException("No table result scanner provided!");
        }
        Result res;
        try {
            res = resultScanner.next();
        } catch (Exception e) {
            resultScanner.close();
            // workaround for timeout on scan
            LOG.warn(
                    "Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
            scan.withStartRow(currentRow, false);
            resultScanner = table.getScanner(scan);
            res = resultScanner.next();
        }

        if (res != null) {
            scannedRows++;
            currentRow = res.getRow();
            return mapResultToOutType(res);
        }

        endReached = true;
        return null;
    }

    private void logSplitInfo(String action, TableInputSplit split) {
        int splitId = split.getSplitNumber();
        String splitStart = Bytes.toString(split.getStartRow());
        String splitEnd = Bytes.toString(split.getEndRow());
        String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
        String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
        String[] hostnames = split.getHostnames();
        LOG.info(
                "{} split (this={})[{}|{}|{}|{}]",
                action,
                this,
                splitId,
                hostnames,
                splitStartKey,
                splitStopKey);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return endReached;
    }

    @Override
    public void close() throws IOException {
        LOG.info("Closing split (scanned {} rows)", scannedRows);
        currentRow = null;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/AbstractTableInputFormat.java [148:200]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public T nextRecord(T reuse) throws IOException {
        if (resultScanner == null) {
            throw new IOException("No table result scanner provided!");
        }
        Result res;
        try {
            res = resultScanner.next();
        } catch (Exception e) {
            resultScanner.close();
            // workaround for timeout on scan
            LOG.warn(
                    "Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
            scan.withStartRow(currentRow, false);
            resultScanner = table.getScanner(scan);
            res = resultScanner.next();
        }

        if (res != null) {
            scannedRows++;
            currentRow = res.getRow();
            return mapResultToOutType(res);
        }

        endReached = true;
        return null;
    }

    private void logSplitInfo(String action, TableInputSplit split) {
        int splitId = split.getSplitNumber();
        String splitStart = Bytes.toString(split.getStartRow());
        String splitEnd = Bytes.toString(split.getEndRow());
        String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
        String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
        String[] hostnames = split.getHostnames();
        LOG.info(
                "{} split (this={})[{}|{}|{}|{}]",
                action,
                this,
                splitId,
                hostnames,
                splitStartKey,
                splitStopKey);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return endReached;
    }

    @Override
    public void close() throws IOException {
        LOG.info("Closing split (scanned {} rows)", scannedRows);
        currentRow = null;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



