def determine_scale_up_nodes()

in services/jenkins-autoscaling/lambda_mxnet_ci/autoscaling/handler.py [0:0]


def determine_scale_up_nodes(queue_items: List[Dict[str, Any]], nodes: List[Dict[str, Any]],
                             unconnected: Dict[str, List[str]]) -> Dict[str, int]:
    """
    Determine the number of nodes which are required of each label
    :param queue_items: Currently enqueued items
    :param nodes: Currently connected nodes
    :param unconnected: instances which are starting or not yet connected
    :return: Dict(label, nb_required_nodes)
    """
    dict_required_executors: DefaultDict[str, int] = defaultdict(int)

    idle_nodes_per_label = _get_idle_nodes_per_label(nodes_data=nodes)

    cur_time_s = time.time()
    for queue_item in queue_items:
        # Make sure we're only processing queue items that are related to resource starvation
        label = _label_from_queued_job(nodes=nodes, queue_item=queue_item)
        if label:
            if label not in _minimum_queue_times_sec():  # pragma: no cover
                logging.error("Label %s from queue reason '%s' is not part of MINIMUM_QUEUE_TIMES_SEC - skipping..",
                              label, queue_item['why'])
                continue

            # Only consider items which have been in the queue for a specific time. This ensure we're not scaling
            # too aggressively.
            queue_duration_s = cur_time_s - (queue_item['inQueueSince'] / 1000)
            if queue_duration_s < _minimum_queue_times_sec()[label]:
                logging.debug('Queue duration of item %s is not mature enough: %d<%d',
                              queue_item['id'], queue_duration_s, _minimum_queue_times_sec()[label])
                continue

            # See if there are actually no nodes available or if the problem is actually that this job has no permission
            # to run on a restricted node. Unfortunately, we can't access the individual executors, so we have to rely
            # on the is_idle metric. TODO: Find out if we can get more detailed data or maybe even blacklist a job
            # Unfortunately, this will trigger a ping-pong in scale up and down since we're unable to determine ahead of
            # time whether a job is actually lacking nodes to run on or whether it's just ineligible. For now, we will
            # throw an error. In future, this could automatically be handled by another job. Also, an investigation
            # should be kicked off since this could mean somebody is trying to run on a restricted slave without
            # permission.
            if label in idle_nodes_per_label and idle_nodes_per_label[label] > 0:
                logging.error('Queue item %s is scheduled for label %s, but there are %d idle nodes available. This is '
                              'most likely somebody unauthorized trying to schedule an unrestricted job onto a '
                              'restricted slave. Please investigate by checking the job queue.',
                              queue_item['id'], label, idle_nodes_per_label[label])
                continue

            dict_required_executors[label] = dict_required_executors.get(label, 0) + 1
        else:
            logging.debug('Queue item %s is not related to resource starvation: %s',
                          queue_item['id'], queue_item['why'])

    label2num_instances = _calculate_nb_required_nodes(dict_required_executors=dict_required_executors)

    # substract the number of unconnected instances
    for label, names in unconnected.items():
        logging.debug('%d nodes of type %s currently starting', len(names), label)
        resulting_number = label2num_instances.get(label, 0) - len(names)
        resulting_number = max(0, resulting_number)  # Take negative numbers into account
        if resulting_number > 0:
            label2num_instances[label] = resulting_number
            logging.debug('%d new nodes for %s required (down from %d)', resulting_number, label, len(names))
        else:
            if label in label2num_instances:
                label2num_instances.pop(label)
                logging.debug('No new nodes for %s required (down from %d)', label, len(names))

    return label2num_instances