in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java [353:501]
private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
int allocatedResources = 0;
TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
= resourceRequest.request.getResourceRequestPriority();
if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
resourceRequest.request.getMinDiskSlotPerContainer());
int liveWorkerSize = randomWorkers.size();
Set<String> insufficientWorkers = new HashSet<String>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
if(allocatedResources >= numContainers) {
break;
}
if(insufficientWorkers.size() >= liveWorkerSize) {
if(!checkMax) {
break;
}
insufficientWorkers.clear();
checkMax = false;
}
int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
for(String eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
}
if(insufficientWorkers.size() >= liveWorkerSize) {
break;
}
Worker worker = rmContext.getWorkers().get(eachWorker);
WorkerResource workerResource = worker.getResource();
if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
int workerMemory;
if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
workerMemory = maxMemoryMB;
} else {
workerMemory = workerResource.getAvailableMemoryMB();
}
AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
allocatedWorkerResource.worker = worker;
allocatedWorkerResource.allocatedMemoryMB = workerMemory;
if(workerResource.getAvailableDiskSlots() >= diskSlot) {
allocatedWorkerResource.allocatedDiskSlots = diskSlot;
} else {
allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
allocatedResources++;
} else {
insufficientWorkers.add(eachWorker);
}
}
}
}
} else {
synchronized(rmContext) {
List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
resourceRequest.request.getMinMemoryMBPerContainer());
int liveWorkerSize = randomWorkers.size();
Set<String> insufficientWorkers = new HashSet<String>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
if(allocatedResources >= numContainers) {
break;
}
if(insufficientWorkers.size() >= liveWorkerSize) {
if(!checkMax) {
break;
}
insufficientWorkers.clear();
checkMax = false;
}
float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
for(String eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
}
if(insufficientWorkers.size() >= liveWorkerSize) {
break;
}
Worker worker = rmContext.getWorkers().get(eachWorker);
WorkerResource workerResource = worker.getResource();
if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
float workerDiskSlots;
if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
workerDiskSlots = maxDiskSlots;
} else {
workerDiskSlots = workerResource.getAvailableDiskSlots();
}
AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
allocatedWorkerResource.worker = worker;
allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
if(workerResource.getAvailableMemoryMB() >= memoryMB) {
allocatedWorkerResource.allocatedMemoryMB = memoryMB;
} else {
allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
allocatedResources++;
} else {
insufficientWorkers.add(eachWorker);
}
}
}
}
}
return selectedWorkers;
}