in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java [226:301]
private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Computing runnability: " + goal);
}
if (runnabilityMap.containsKey(goal)) {
return runnabilityMap.get(goal);
}
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
if (lastAttempt != null) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
}
if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
runnabilityMap.put(goal, runnability);
return runnability;
}
if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
runnabilityMap.put(goal, runnability);
return runnability;
}
}
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
for (PartitionId pid : goal.getRequiredPartitions()) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Inspecting required partition: " + pid);
}
Runnability runnability;
ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
PartitionState maxState = pmm.getMaximumAvailableState(pid);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
}
if (PartitionState.COMMITTED.equals(maxState)) {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
} else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
} else {
runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
switch (runnability.getTag()) {
case RUNNABLE:
if (cPolicy.consumerWaitsForProducerToFinish()) {
runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
} else {
runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
}
break;
case NOT_RUNNABLE:
break;
case RUNNING:
if (cPolicy.consumerWaitsForProducerToFinish()) {
runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
} else {
runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
}
break;
}
}
aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
if (aggregateRunnability.getTag() == Runnability.Tag.NOT_RUNNABLE) {
// already not runnable -- cannot get better. bail.
break;
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
}
}
runnabilityMap.put(goal, aggregateRunnability);
return aggregateRunnability;
}