public void run()

in modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java [80:165]


  public void run() {

    List<TableRange> ranges = new ArrayList<>();
    Set<TableRange> rangeSet = new HashSet<>();

    int qSize = proccessor.size();

    while (!stopped.get()) {
      try {
        ranges.clear();
        rangeSet.clear();

        PartitionInfo partition = partitionManager.waitForPartitionInfo();

        while (proccessor.size() > qSize / 2 && !stopped.get()) {
          UtilWaitThread.sleep(50, stopped);
        }

        partition.getMyGroupsRanges().forEach(t -> {
          ranges.add(t);
          rangeSet.add(t);
        });
        Collections.shuffle(ranges, rand);
        rangeData.keySet().retainAll(rangeSet);

        long minRetryTime = maxSleepTime + System.currentTimeMillis();
        ScanCounts ntfyCounts = new ScanCounts();
        int tabletsScanned = 0;
        try {
          for (TableRange tabletRange : ranges) {
            TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
            if (System.currentTimeMillis() >= tabletData.retryTime) {
              ScanCounts counts;
              PartitionInfo pi = partitionManager.getPartitionInfo();
              if (partition.equals(pi)) {
                try (Session session =
                    proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
                  // notifications could have been asynchronously queued for deletion. Let that
                  // happen 1st before scanning
                  env.getSharedResources().getBatchWriter().waitForAsyncFlush();

                  counts = scan(session, partition, tabletRange.getRange());
                  tabletsScanned++;
                }
              } else {
                break;
              }
              tabletData.updateScanCount(counts.added, maxSleepTime);
              ntfyCounts.added += counts.added;
              ntfyCounts.seen += counts.seen;
              if (stopped.get()) {
                break;
              }
            }

            minRetryTime = Math.min(tabletData.retryTime, minRetryTime);
          }
        } catch (PartitionInfoChangedException mpce) {
          // nothing to do
        }

        long sleepTime;
        if (!partition.equals(partitionManager.getPartitionInfo())) {
          sleepTime = minSleepTime;
        } else {
          sleepTime = Math.max(minSleepTime, minRetryTime - System.currentTimeMillis());
        }

        qSize = proccessor.size();

        log.debug("Scanned {} of {} tablets. Notifications added: {} seen: {} queued: {}",
            tabletsScanned, ranges.size(), ntfyCounts.added, ntfyCounts.seen, qSize);

        if (!stopped.get()) {
          UtilWaitThread.sleep(sleepTime, stopped);
        }

      } catch (Exception e) {
        if (isInterruptedException(e)) {
          log.debug("Error while looking for notifications", e);
        } else {
          log.error("Error while looking for notifications", e);
        }
      }
    }
  }