in solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java [167:406]
public void run() {
MDCLoggingContext.setNode(thisNode);
log.debug("Process current queue of overseer operations");
LeaderStatus isLeader = amILeader();
while (isLeader == LeaderStatus.DONT_KNOW) {
log.debug("am_i_leader unclear {}", isLeader);
isLeader = amILeader(); // not a no, not a yes, try ask again
}
String oldestItemInWorkQueue = null;
// hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a
// previous Overseer. This variable is set in case there's any task found on the workQueue when
// the OCP starts up and the id for the queue tail is used as a marker to check for the task in
// completed/failed map in zk. Beyond the marker, all tasks can safely be assumed to have never
// been executed.
boolean hasLeftOverItems = true;
try {
oldestItemInWorkQueue = workQueue.getTailId();
} catch (KeeperException e) {
// We don't need to handle this. This is just a fail-safe which comes in handy in skipping
// already processed async calls.
log.error("KeeperException", e);
} catch (IllegalStateException ignore) {
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (oldestItemInWorkQueue == null) hasLeftOverItems = false;
else
log.debug(
"Found already existing elements in the work-queue. Last element: {}",
oldestItemInWorkQueue);
try {
prioritizer.prioritizeOverseerNodes(myId);
} catch (IllegalStateException ignore) {
return;
} catch (Exception e) {
if (!zkStateReader.getZkClient().isClosed()) {
log.error("Unable to prioritize overseer ", e);
}
}
// TODO: Make maxThreads configurable.
this.tpe =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
5,
MAX_PARALLEL_TASKS,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new SolrNamedThreadFactory("OverseerThreadFactory"));
// In OverseerCollectionMessageHandler, a new Session needs to be created for each new iteration
// over the tasks in the queue. Incrementing this id causes a new session to be created there.
long batchSessionId = 0;
try {
while (!this.isClosed) {
try {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
break;
} else if (LeaderStatus.YES != isLeader) {
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try asking again
}
if (log.isDebugEnabled()) {
log.debug(
"Cleaning up work-queue. #Running tasks: {} #Completed tasks: {}",
runningTasks.size(),
completedTasks.size());
}
cleanUpWorkQueue();
printTrackingMaps();
boolean waited = false;
while (runningTasks.size() > MAX_PARALLEL_TASKS) {
synchronized (waitLock) {
waitLock.wait(100); // wait for 100 ms or till a task is complete
}
waited = true;
}
if (waited) cleanUpWorkQueue();
ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
heads.addAll(blockedTasks.values());
// If we have enough items in the blocked tasks already, it makes
// no sense to read more items from the work queue. it makes sense
// to clear out at least a few items in the queue before we read more items
if (heads.size() < MAX_BLOCKED_TASKS) {
// instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as
// we can execute
int toFetch =
Math.min(
MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
if (log.isDebugEnabled()) {
log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
}
// heads has at most MAX_BLOCKED_TASKS tasks.
heads.addAll(newTasks);
} else {
// The sleep below slows down spinning when heads is full from previous work dispatch
// attempt below and no new tasks got executed (all executors are busy or all waiting
// tasks require locks currently held by executors).
//
// When heads is not full but no progress was made (no new work got dispatched in the
// for loop below), slowing down of the spinning is done by the wait time in the call to
// workQueue.peekTopN() above. (at least in theory because the method eventually called
// from there is ZkDistributedQueue.peekElements() and because it filters out entries
// that have just completed on a Runner thread in a different way than the predicate
// based filtering, it can return quickly without waiting the configured delay time.
// Therefore spinning can be observed, likely something to clean up at some point).
//
// If heads is not empty and new tasks appeared in the queue there's no delay,
// workQueue.peekTopN() above will return immediately.
Thread.sleep(1000);
}
if (isClosed) break;
if (heads.isEmpty()) {
continue;
}
// clear the blocked tasks, may get refilled below. Given blockedTasks can only get
// entries from heads and heads has at most MAX_BLOCKED_TASKS tasks, blockedTasks will
// never exceed MAX_BLOCKED_TASKS entries.
// Note blockedTasks can't be cleared too early as it is used in the excludedTasks
// Predicate above.
blockedTasks.clear();
// Trigger the creation of a new Session used for locking when/if a lock is later acquired
// on the OverseerCollectionMessageHandler
batchSessionId++;
boolean tooManyTasks = false;
for (QueueEvent head : heads) {
if (!tooManyTasks) {
tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
}
if (tooManyTasks) {
// Too many tasks are running, just shove the rest into the "blocked" queue.
blockedTasks.put(head.getId(), head);
continue;
}
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
if (asyncId != null
&& (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
log.debug(
"Found already processed task in workQueue, cleaning up. AsyncId [{}]",
asyncId);
workQueue.remove(head);
continue;
}
}
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation == null) {
log.error("Msg does not have required {} : {}", Overseer.QUEUE_OPERATION, message);
workQueue.remove(head);
continue;
}
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, batchSessionId);
if (lock == null) {
if (log.isDebugEnabled()) {
log.debug("Exclusivity check failed for [{}]", message);
}
blockedTasks.put(head.getId(), head);
continue;
}
try {
markTaskAsRunning(head, asyncId);
if (log.isDebugEnabled()) {
log.debug("Marked task [{}] as running", head.getId());
}
} catch (KeeperException.NodeExistsException e) {
lock.unlock();
// This should never happen
log.error("Tried to pick up task [{}] when it was already running!", head.getId());
continue;
} catch (InterruptedException e) {
lock.unlock();
log.error(
"Thread interrupted while trying to pick task {} for execution.", head.getId());
Thread.currentThread().interrupt();
continue;
}
if (log.isDebugEnabled()) {
log.debug(
"{}: Get the message id: {} message: {}",
messageHandler.getName(),
head.getId(),
message);
}
Runner runner = new Runner(messageHandler, message, operation, head, lock);
tpe.execute(runner);
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Overseer cannot talk to ZK");
return;
}
log.error("KeeperException", e);
// Prevent free-spinning this loop.
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (IllegalStateException ignore) {
} catch (Exception e) {
log.error("Exception processing", e);
}
}
} finally {
this.close();
}
}