in core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala [441:557]
def schedule(invokers: List[InvokerHealth], msgs: List[ContainerCreationMessage], minMemory: ByteSize)(
implicit logging: Logging): List[ScheduledPair] = {
logging.info(this, s"usable total invoker size: ${invokers.size}")
val noTaggedInvokers = invokers.filter(_.id.tags.isEmpty)
val managed = Math.max(1, Math.ceil(noTaggedInvokers.size.toDouble * managedFraction).toInt)
val blackboxes = Math.max(1, Math.floor(noTaggedInvokers.size.toDouble * blackboxFraction).toInt)
val managedInvokers = noTaggedInvokers.take(managed)
val blackboxInvokers = noTaggedInvokers.takeRight(blackboxes)
logging.info(
this,
s"${msgs.size} creation messages for ${msgs.head.invocationNamespace}/${msgs.head.action}, managedFraction:$managedFraction, blackboxFraction:$blackboxFraction, managed invoker size:$managed, blackboxes invoker size:$blackboxes")
val list = msgs
.foldLeft((List.empty[ScheduledPair], invokers)) { (tuple, msg: ContainerCreationMessage) =>
val pairs = tuple._1
val candidates = tuple._2
val requiredResources =
msg.whiskActionMetaData.annotations
.getAs[Seq[String]](Annotations.InvokerResourcesAnnotationName)
.getOrElse(Seq.empty[String])
val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
.getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
.getOrElse(true)
val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull)
if (requiredResources.isEmpty) {
// only choose managed invokers or blackbox invokers
val wantedInvokers = if (isBlackboxInvocation) {
logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for blackbox invokers to schedule.")
candidates
.filter(
c =>
blackboxInvokers
.map(b => b.id.instance)
.contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
.toSet
} else {
logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for managed invokers to schedule.")
candidates
.filter(
c =>
managedInvokers
.map(m => m.id.instance)
.contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes)
.toSet
}
val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
if (wantedInvokers.nonEmpty) {
val scheduledPair = chooseInvokerFromCandidates(wantedInvokers.toList, msg)
val updatedInvokers =
updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
(scheduledPair :: pairs, updatedInvokers)
} else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
logging.info(
this,
s"[${msg.invocationNamespace}/${msg.action}] since there is no available non-tagged invoker, choose one among tagged invokers.")
val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, msg)
val updatedInvokers =
updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
(scheduledPair :: pairs, updatedInvokers)
} else {
logging.error(
this,
s"[${msg.invocationNamespace}/${msg.action}] there is no invoker available to schedule to schedule.")
val scheduledPair =
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
(scheduledPair :: pairs, invokers)
}
} else {
logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for tagged invokers to schedule.")
val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
if (wantedInvokers.nonEmpty) {
val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, msg)
val updatedInvokers =
updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
(scheduledPair :: pairs, updatedInvokers)
} else if (resourcesStrictPolicy) {
logging.error(
this,
s"[${msg.invocationNamespace}/${msg.action}] there is no available invoker with the resource: ${requiredResources}")
val scheduledPair =
ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError))
(scheduledPair :: pairs, invokers)
} else {
logging.info(
this,
s"[${msg.invocationNamespace}/${msg.action}] since there is no available invoker with the resource, choose any invokers without the resource.")
val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
val scheduledPair = chooseInvokerFromCandidates(noTaggedInvokers, msg)
val updatedInvokers =
updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
(scheduledPair :: pairs, updatedInvokers)
} else {
val leftInvokers =
taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
if (leftInvokers.nonEmpty) {
val scheduledPair = chooseInvokerFromCandidates(leftInvokers, msg)
val updatedInvokers =
updateInvokerMemory(
scheduledPair.invokerId,
msg.whiskActionMetaData.limits.memory.megabytes,
invokers)
(scheduledPair :: pairs, updatedInvokers)
} else {
logging.error(this, s"[${msg.invocationNamespace}/${msg.action}] no available invoker is found")
val scheduledPair =
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError))
(scheduledPair :: pairs, invokers)
}
}
}
}
}
._1 // pairs
list
}