in twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java [347:421]
protected void doRun() throws Exception {
// The main loop
Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@Override
public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
launchRunnable(launchers, provisioning);
}
@Override
public void completed(List<YarnContainerStatus> completed) {
handleCompleted(completed);
}
};
long requestStartTime = 0;
boolean isRequestRelaxed = false;
long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
while (!stopped) {
// Call allocate. It has to be made at first in order to be able to get cluster resource availability.
amClient.allocate(0.0f, allocateHandler);
// Looks for containers requests.
if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) {
LOG.info("All containers completed. Shutting down application master.");
break;
}
// If nothing is in provisioning, and no pending request, move to next one
while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
RunnableContainerRequest runnableContainerRequest = runnableContainerRequests.peek();
if (!runnableContainerRequest.isReadyToBeProvisioned()) {
// take it out from queue and put it back at the end for second chance.
runnableContainerRequest = runnableContainerRequests.poll();
runnableContainerRequests.add(runnableContainerRequest);
continue;
}
currentRequest = runnableContainerRequest.takeRequest();
if (currentRequest == null) {
// All different types of resource request from current order is done, move to next one
// TODO: Need to handle order type as well
runnableContainerRequests.poll();
}
}
// Nothing in provision, makes the next batch of provision request
if (provisioning.isEmpty() && currentRequest != null) {
manageBlacklist(currentRequest);
addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
currentRequest.getKey().getType());
currentRequest = null;
requestStartTime = System.currentTimeMillis();
isRequestRelaxed = false;
}
// Check for provision request timeout i.e. check if any provision request has been pending
// for more than the designated time. On timeout, relax the request constraints.
if (!provisioning.isEmpty() && !isRequestRelaxed &&
(System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
// Clear the blacklist for the pending provision request(s).
amClient.clearBlacklist();
isRequestRelaxed = true;
}
nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
if (isRunning()) {
TimeUnit.SECONDS.sleep(1);
}
}
}