private Runnability assignRunnabilityRank()

in hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java [226:301]


    private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) {
        if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("Computing runnability: " + goal);
        }
        if (runnabilityMap.containsKey(goal)) {
            return runnabilityMap.get(goal);
        }
        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
        if (lastAttempt != null) {
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus());
            }
            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
                Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE);
                runnabilityMap.put(goal, runnability);
                return runnability;
            }
            if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
                Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE);
                runnabilityMap.put(goal, runnability);
                return runnability;
            }
        }
        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap();
        PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
        Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
        for (PartitionId pid : goal.getRequiredPartitions()) {
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.fine("Inspecting required partition: " + pid);
            }
            Runnability runnability;
            ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
            IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
            PartitionState maxState = pmm.getMaximumAvailableState(pid);
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState);
            }
            if (PartitionState.COMMITTED.equals(maxState)) {
                runnability = new Runnability(Runnability.Tag.RUNNABLE, 0);
            } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) {
                runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
            } else {
                runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap);
                switch (runnability.getTag()) {
                    case RUNNABLE:
                        if (cPolicy.consumerWaitsForProducerToFinish()) {
                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
                        } else {
                            runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1);
                        }
                        break;

                    case NOT_RUNNABLE:
                        break;

                    case RUNNING:
                        if (cPolicy.consumerWaitsForProducerToFinish()) {
                            runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE);
                        } else {
                            runnability = new Runnability(Runnability.Tag.RUNNABLE, 1);
                        }
                        break;
                }
            }
            aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability);
            if (aggregateRunnability.getTag() == Runnability.Tag.NOT_RUNNABLE) {
                // already not runnable -- cannot get better. bail.
                break;
            }
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.fine("aggregateRunnability: " + aggregateRunnability);
            }
        }
        runnabilityMap.put(goal, aggregateRunnability);
        return aggregateRunnability;
    }