in core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala [49:259]
private[queue] def decide(snapshot: QueueSnapshot) = {
val QueueSnapshot(
initialized,
incoming,
currentMsg,
existing,
inProgress,
staleActivationNum,
existingContainerCountInNs,
inProgressContainerCountInNs,
averageDuration,
namespaceLimit,
actionLimit,
maxActionConcurrency,
stateName,
_) = snapshot
val totalContainers = existing + inProgress
val availableMsg = currentMsg + incoming.get()
val actionCapacity = actionLimit - totalContainers
val namespaceCapacity = namespaceLimit - existingContainerCountInNs - inProgressContainerCountInNs
val overProvisionCapacity = ceiling(namespaceLimit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs
if (Math.min(namespaceLimit, actionLimit) <= 0) {
// this is an error case, the limit should be bigger than 0
stateName match {
case Flushing => Future.successful(DecisionResults(Skip, 0))
case _ => Future.successful(DecisionResults(Pausing, 0))
}
} else {
val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) {
// if space available within the over provision ratio amount above namespace limit, create one container for new
// action so namespace traffic can attempt to re-balance without blocking entire action
if (overProvisionCapacity > 0) {
1
} else {
0
}
} else {
Math.min(namespaceCapacity, actionCapacity)
}
if (capacity <= 0) {
stateName match {
/**
* If the container is created later (for any reason), all activations fail(too many requests).
*
* However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container.
*/
case Running
if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity <= 0) =>
logging.info(
this,
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, actionLimit: $actionLimit, namespaceLimit: $namespaceLimit, namespaceContainers: $existingContainerCountInNs, namespaceInProgressContainer: $inProgressContainerCountInNs) [$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity > 0 =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
// no need to print any messages if the state is already NamespaceThrottled
Future.successful(DecisionResults(Skip, 0))
}
} else {
(stateName, averageDuration) match {
// there is no container
case (Running, None) if totalContainers == 0 && !initialized =>
logging.info(
this,
s"add one initial container if totalContainers($totalContainers) == 0 [$invocationNamespace:$action]")
Future.successful(DecisionResults(AddInitialContainer, 1))
// Todo: when disabling throttling we may create some containers.
case (NamespaceThrottled, _) =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// this is an exceptional case, create a container immediately
case (Running, _) if totalContainers == 0 && availableMsg > 0 =>
logging.info(
this,
s"add one container if totalContainers($totalContainers) == 0 && availableMsg($availableMsg) > 0 [$invocationNamespace:$action]")
Future.successful(DecisionResults(AddContainer, 1))
case (Flushing, _) if totalContainers == 0 =>
logging.info(
this,
s"add one container case Paused if totalContainers($totalContainers) == 0 [$invocationNamespace:$action]")
// it is highly likely the queue could not create an initial container if the limit is 0
Future.successful(DecisionResults(AddInitialContainer, 1))
// there is no activation result yet, but some activations became stale
// it may cause some over-provisioning if it takes much time to create a container and execution time is short.
// but it is a kind of trade-off and we place latency on top of over-provisioning
case (Running, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
existing,
inProgress,
0,
availableMsg,
capacity,
namespaceCapacity,
actualNum,
staleActivationNum,
0.0,
Running)
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence, have extra protection in case duration is somehow negative
val containerThroughput = if (duration <= 0) {
maxActionConcurrency
} else {
(staleThreshold / duration) * maxActionConcurrency
}
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg - staleActivationNum
var staleContainerProvision = 0
if (staleActivationNum > 0) {
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
}
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations && duration > 0) {
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum =
if (num + totalContainers > availableNonStaleActivations) availableNonStaleActivations - totalContainers
else num
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
namespaceCapacity,
actualNum + staleContainerProvision,
staleActivationNum,
duration,
Running)
} else if (staleContainerProvision > 0) {
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
namespaceCapacity,
staleContainerProvision,
staleActivationNum,
duration,
Running)
} else {
Future.successful(DecisionResults(Skip, 0))
}
// generally we assume there are enough containers for actions when shutting down the scheduler
// but if there were already too many activation in the queue with not enough containers,
// we should add more containers to quickly consume those messages.
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = if (duration <= 0) {
maxActionConcurrency
} else {
(staleThreshold / duration) * maxActionConcurrency
}
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
namespaceCapacity,
actualNum,
staleActivationNum,
duration,
Running)
// same with the above case but no duration exist.
case (Removing, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
existing,
inProgress,
0,
availableMsg,
capacity,
namespaceCapacity,
actualNum,
staleActivationNum,
0.0,
Running)
// do nothing
case _ =>
Future.successful(DecisionResults(Skip, 0))
}
}
}
}