in samza-core/src/main/java/org/apache/samza/container/RunLoop.java [957:993]
private boolean isReady() {
if (checkEndOfStream()) {
endOfStream = true;
}
if (shouldDrain()) {
shouldDrain = true;
}
if (coordinatorRequests.commitRequests().remove(taskName)) {
needCommit = true;
}
boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight;
/*
* A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
* and either of the following conditions are true.
* a) When process, window, commit and scheduler are not in progress.
* b) When task.async.commit is true and window, commit are not in progress.
*/
if (needCommit) {
return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight;
} else if (needWindow || needScheduler || endOfStream || shouldDrain) {
/*
* A task is ready for window, scheduler, drain or end-of-stream operation.
*/
return messagesInFlight.get() == 0 && !opInFlight;
} else {
/*
* A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
* and either of the following conditions are true.
* a) When window, commit and scheduler are not in progress.
* b) When task.async.commit is true and window and scheduler are not in progress.
*/
return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight);
}
}