in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/util/TaskCalculator.java [37:92]
public int getMaxMapTasks() throws IOException {
JobConf conf = (JobConf) jobClient.getConf();
// Total number of nodes in the cluster
int nodes = jobClient.getClusterStatus().getTaskTrackers();
log.info("Cluster has " + nodes + " active nodes.");
if (nodes == 0) {
log.warn("Cluster doesn't have any nodes");
return 0;
}
// Memory per slot
int slotMemory = conf.getInt("yarn.scheduler.minimum-allocation-mb", 1024); // Default value
// from yarn-default.xml
// Number of slots in a core node
int nodeMemory = nodeCapacityProvider.getCoreNodeMemoryMB();
int nodeSlots = nodeMemory / slotMemory;
// Number of slots for a mapper
int mapMemory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
int mapSlots = (int) Math.ceil((double) mapMemory / slotMemory);
// Number of slots for an application master
int amMemory = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
int appMasterSlots = (int) Math.ceil((double) amMemory / slotMemory);
// Number of slots for a reducer
int reduceMemory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig
.DEFAULT_REDUCE_MEMORY_MB);
int reduceSlots = (int) Math.ceil((double) reduceMemory / slotMemory);
// Number of reducers
int reducers = conf.getNumReduceTasks();
// Calculate the number of mappers
int mappers = yarnContainerAllocator.getMaxMappers(nodes, reducers, nodeSlots,
appMasterSlots, mapSlots, reduceSlots);
log.info("Slot size: " + slotMemory + "MB.");
log.info("Node manager can allocate " + nodeMemory + "MB (" + nodeSlots + " slots) for "
+ "containers on each node.");
log.info("Each mapper needs: " + mapMemory + "MB. (" + mapSlots + " slots)");
log.info("Each reducer needs: " + reduceMemory + "MB. (" + reduceSlots + " slots)");
log.info("MapReduce Application Manager needs: " + amMemory + " MB. (" + appMasterSlots + " "
+ "slots)");
log.info("Number of reducers: " + reducers);
log.info("Max number of cluster map tasks: " + mappers);
if (mappers < 1) {
log.warn("The calculated max number of concurrent map tasks is less than 1. Use 1 instead.");
mappers = 1;
}
return mappers;
}