public void run()

in solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java [167:406]


  public void run() {
    MDCLoggingContext.setNode(thisNode);
    log.debug("Process current queue of overseer operations");
    LeaderStatus isLeader = amILeader();
    while (isLeader == LeaderStatus.DONT_KNOW) {
      log.debug("am_i_leader unclear {}", isLeader);
      isLeader = amILeader(); // not a no, not a yes, try ask again
    }

    String oldestItemInWorkQueue = null;
    // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a
    // previous Overseer. This variable is set in case there's any task found on the workQueue when
    // the OCP starts up and the id for the queue tail is used as a marker to check for the task in
    // completed/failed map in zk. Beyond the marker, all tasks can safely be assumed to have never
    // been executed.
    boolean hasLeftOverItems = true;

    try {
      oldestItemInWorkQueue = workQueue.getTailId();
    } catch (KeeperException e) {
      // We don't need to handle this. This is just a fail-safe which comes in handy in skipping
      // already processed async calls.
      log.error("KeeperException", e);
    } catch (IllegalStateException ignore) {
      return;
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    if (oldestItemInWorkQueue == null) hasLeftOverItems = false;
    else
      log.debug(
          "Found already existing elements in the work-queue. Last element: {}",
          oldestItemInWorkQueue);

    try {
      prioritizer.prioritizeOverseerNodes(myId);
    } catch (IllegalStateException ignore) {
      return;
    } catch (Exception e) {
      if (!zkStateReader.getZkClient().isClosed()) {
        log.error("Unable to prioritize overseer ", e);
      }
    }

    // TODO: Make maxThreads configurable.

    this.tpe =
        new ExecutorUtil.MDCAwareThreadPoolExecutor(
            5,
            MAX_PARALLEL_TASKS,
            0L,
            TimeUnit.MILLISECONDS,
            new SynchronousQueue<>(),
            new SolrNamedThreadFactory("OverseerThreadFactory"));

    // In OverseerCollectionMessageHandler, a new Session needs to be created for each new iteration
    // over the tasks in the queue. Incrementing this id causes a new session to be created there.
    long batchSessionId = 0;

    try {
      while (!this.isClosed) {
        try {
          isLeader = amILeader();
          if (LeaderStatus.NO == isLeader) {
            break;
          } else if (LeaderStatus.YES != isLeader) {
            log.debug("am_i_leader unclear {}", isLeader);
            continue; // not a no, not a yes, try asking again
          }

          if (log.isDebugEnabled()) {
            log.debug(
                "Cleaning up work-queue. #Running tasks: {} #Completed tasks: {}",
                runningTasks.size(),
                completedTasks.size());
          }
          cleanUpWorkQueue();

          printTrackingMaps();

          boolean waited = false;

          while (runningTasks.size() > MAX_PARALLEL_TASKS) {
            synchronized (waitLock) {
              waitLock.wait(100); // wait for 100 ms or till a task is complete
            }
            waited = true;
          }

          if (waited) cleanUpWorkQueue();

          ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
          heads.addAll(blockedTasks.values());

          // If we have enough items in the blocked tasks already, it makes
          // no sense to read more items from the work queue. it makes sense
          // to clear out at least a few items in the queue before we read more items
          if (heads.size() < MAX_BLOCKED_TASKS) {
            // instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as
            // we can execute
            int toFetch =
                Math.min(
                    MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
            List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
            if (log.isDebugEnabled()) {
              log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
            }
            // heads has at most MAX_BLOCKED_TASKS tasks.
            heads.addAll(newTasks);
          } else {
            // The sleep below slows down spinning when heads is full from previous work dispatch
            // attempt below and no new tasks got executed (all executors are busy or all waiting
            // tasks require locks currently held by executors).
            //
            // When heads is not full but no progress was made (no new work got dispatched in the
            // for loop below), slowing down of the spinning is done by the wait time in the call to
            // workQueue.peekTopN() above. (at least in theory because the method eventually called
            // from there is ZkDistributedQueue.peekElements() and because it filters out entries
            // that have just completed on a Runner thread in a different way than the predicate
            // based filtering, it can return quickly without waiting the configured delay time.
            // Therefore spinning can be observed, likely something to clean up at some point).
            //
            // If heads is not empty and new tasks appeared in the queue there's no delay,
            // workQueue.peekTopN() above will return immediately.
            Thread.sleep(1000);
          }

          if (isClosed) break;

          if (heads.isEmpty()) {
            continue;
          }

          // clear the blocked tasks, may get refilled below. Given blockedTasks can only get
          // entries from heads and heads has at most MAX_BLOCKED_TASKS tasks, blockedTasks will
          // never exceed MAX_BLOCKED_TASKS entries.
          // Note blockedTasks can't be cleared too early as it is used in the excludedTasks
          // Predicate above.
          blockedTasks.clear();

          // Trigger the creation of a new Session used for locking when/if a lock is later acquired
          // on the OverseerCollectionMessageHandler
          batchSessionId++;

          boolean tooManyTasks = false;
          for (QueueEvent head : heads) {
            if (!tooManyTasks) {
              tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
            }
            if (tooManyTasks) {
              // Too many tasks are running, just shove the rest into the "blocked" queue.
              blockedTasks.put(head.getId(), head);
              continue;
            }
            if (runningZKTasks.contains(head.getId())) continue;
            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
            final String asyncId = message.getStr(ASYNC);
            if (hasLeftOverItems) {
              if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
              if (asyncId != null
                  && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
                log.debug(
                    "Found already processed task in workQueue, cleaning up. AsyncId [{}]",
                    asyncId);
                workQueue.remove(head);
                continue;
              }
            }
            String operation = message.getStr(Overseer.QUEUE_OPERATION);
            if (operation == null) {
              log.error("Msg does not have required {} : {}", Overseer.QUEUE_OPERATION, message);
              workQueue.remove(head);
              continue;
            }
            OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
            OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, batchSessionId);
            if (lock == null) {
              if (log.isDebugEnabled()) {
                log.debug("Exclusivity check failed for [{}]", message);
              }
              blockedTasks.put(head.getId(), head);
              continue;
            }
            try {
              markTaskAsRunning(head, asyncId);
              if (log.isDebugEnabled()) {
                log.debug("Marked task [{}] as running", head.getId());
              }
            } catch (KeeperException.NodeExistsException e) {
              lock.unlock();
              // This should never happen
              log.error("Tried to pick up task [{}] when it was already running!", head.getId());
              continue;
            } catch (InterruptedException e) {
              lock.unlock();
              log.error(
                  "Thread interrupted while trying to pick task {} for execution.", head.getId());
              Thread.currentThread().interrupt();
              continue;
            }
            if (log.isDebugEnabled()) {
              log.debug(
                  "{}: Get the message id: {} message: {}",
                  messageHandler.getName(),
                  head.getId(),
                  message);
            }
            Runner runner = new Runner(messageHandler, message, operation, head, lock);
            tpe.execute(runner);
          }

        } catch (KeeperException e) {
          if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
            log.warn("Overseer cannot talk to ZK");
            return;
          }
          log.error("KeeperException", e);

          // Prevent free-spinning this loop.
          try {
            Thread.sleep(1000);
          } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            return;
          }

        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          return;
        } catch (IllegalStateException ignore) {

        } catch (Exception e) {
          log.error("Exception processing", e);
        }
      }
    } finally {
      this.close();
    }
  }