in modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java [80:165]
public void run() {
List<TableRange> ranges = new ArrayList<>();
Set<TableRange> rangeSet = new HashSet<>();
int qSize = proccessor.size();
while (!stopped.get()) {
try {
ranges.clear();
rangeSet.clear();
PartitionInfo partition = partitionManager.waitForPartitionInfo();
while (proccessor.size() > qSize / 2 && !stopped.get()) {
UtilWaitThread.sleep(50, stopped);
}
partition.getMyGroupsRanges().forEach(t -> {
ranges.add(t);
rangeSet.add(t);
});
Collections.shuffle(ranges, rand);
rangeData.keySet().retainAll(rangeSet);
long minRetryTime = maxSleepTime + System.currentTimeMillis();
ScanCounts ntfyCounts = new ScanCounts();
int tabletsScanned = 0;
try {
for (TableRange tabletRange : ranges) {
TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
if (System.currentTimeMillis() >= tabletData.retryTime) {
ScanCounts counts;
PartitionInfo pi = partitionManager.getPartitionInfo();
if (partition.equals(pi)) {
try (Session session =
proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
// notifications could have been asynchronously queued for deletion. Let that
// happen 1st before scanning
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
counts = scan(session, partition, tabletRange.getRange());
tabletsScanned++;
}
} else {
break;
}
tabletData.updateScanCount(counts.added, maxSleepTime);
ntfyCounts.added += counts.added;
ntfyCounts.seen += counts.seen;
if (stopped.get()) {
break;
}
}
minRetryTime = Math.min(tabletData.retryTime, minRetryTime);
}
} catch (PartitionInfoChangedException mpce) {
// nothing to do
}
long sleepTime;
if (!partition.equals(partitionManager.getPartitionInfo())) {
sleepTime = minSleepTime;
} else {
sleepTime = Math.max(minSleepTime, minRetryTime - System.currentTimeMillis());
}
qSize = proccessor.size();
log.debug("Scanned {} of {} tablets. Notifications added: {} seen: {} queued: {}",
tabletsScanned, ranges.size(), ntfyCounts.added, ntfyCounts.seen, qSize);
if (!stopped.get()) {
UtilWaitThread.sleep(sleepTime, stopped);
}
} catch (Exception e) {
if (isInterruptedException(e)) {
log.debug("Error while looking for notifications", e);
} else {
log.error("Error while looking for notifications", e);
}
}
}
}