in services/jenkins-autoscaling/lambda_mxnet_ci/autoscaling/handler.py [0:0]
def determine_scale_up_nodes(queue_items: List[Dict[str, Any]], nodes: List[Dict[str, Any]],
unconnected: Dict[str, List[str]]) -> Dict[str, int]:
"""
Determine the number of nodes which are required of each label
:param queue_items: Currently enqueued items
:param nodes: Currently connected nodes
:param unconnected: instances which are starting or not yet connected
:return: Dict(label, nb_required_nodes)
"""
dict_required_executors: DefaultDict[str, int] = defaultdict(int)
idle_nodes_per_label = _get_idle_nodes_per_label(nodes_data=nodes)
cur_time_s = time.time()
for queue_item in queue_items:
# Make sure we're only processing queue items that are related to resource starvation
label = _label_from_queued_job(nodes=nodes, queue_item=queue_item)
if label:
if label not in _minimum_queue_times_sec(): # pragma: no cover
logging.error("Label %s from queue reason '%s' is not part of MINIMUM_QUEUE_TIMES_SEC - skipping..",
label, queue_item['why'])
continue
# Only consider items which have been in the queue for a specific time. This ensure we're not scaling
# too aggressively.
queue_duration_s = cur_time_s - (queue_item['inQueueSince'] / 1000)
if queue_duration_s < _minimum_queue_times_sec()[label]:
logging.debug('Queue duration of item %s is not mature enough: %d<%d',
queue_item['id'], queue_duration_s, _minimum_queue_times_sec()[label])
continue
# See if there are actually no nodes available or if the problem is actually that this job has no permission
# to run on a restricted node. Unfortunately, we can't access the individual executors, so we have to rely
# on the is_idle metric. TODO: Find out if we can get more detailed data or maybe even blacklist a job
# Unfortunately, this will trigger a ping-pong in scale up and down since we're unable to determine ahead of
# time whether a job is actually lacking nodes to run on or whether it's just ineligible. For now, we will
# throw an error. In future, this could automatically be handled by another job. Also, an investigation
# should be kicked off since this could mean somebody is trying to run on a restricted slave without
# permission.
if label in idle_nodes_per_label and idle_nodes_per_label[label] > 0:
logging.error('Queue item %s is scheduled for label %s, but there are %d idle nodes available. This is '
'most likely somebody unauthorized trying to schedule an unrestricted job onto a '
'restricted slave. Please investigate by checking the job queue.',
queue_item['id'], label, idle_nodes_per_label[label])
continue
dict_required_executors[label] = dict_required_executors.get(label, 0) + 1
else:
logging.debug('Queue item %s is not related to resource starvation: %s',
queue_item['id'], queue_item['why'])
label2num_instances = _calculate_nb_required_nodes(dict_required_executors=dict_required_executors)
# substract the number of unconnected instances
for label, names in unconnected.items():
logging.debug('%d nodes of type %s currently starting', len(names), label)
resulting_number = label2num_instances.get(label, 0) - len(names)
resulting_number = max(0, resulting_number) # Take negative numbers into account
if resulting_number > 0:
label2num_instances[label] = resulting_number
logging.debug('%d new nodes for %s required (down from %d)', resulting_number, label, len(names))
else:
if label in label2num_instances:
label2num_instances.pop(label)
logging.debug('No new nodes for %s required (down from %d)', label, len(names))
return label2num_instances