in core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala [251:315]
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(schedulingState.invokers)
override def clusterSize: Int = schedulingState.clusterSize
/** 1. Publish a message to the loadbalancer */
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
val isBlackboxInvocation = action.exec.pull
val actionType = if (!isBlackboxInvocation) "managed" else "blackbox"
val (invokersToUse, stepSizes) =
if (!isBlackboxInvocation) (schedulingState.managedInvokers, schedulingState.managedStepSizes)
else (schedulingState.blackboxInvokers, schedulingState.blackboxStepSizes)
val chosen = if (invokersToUse.nonEmpty) {
val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, action.fullyQualifiedName(false))
val homeInvoker = hash % invokersToUse.size
val stepSize = stepSizes(hash % stepSizes.size)
val invoker: Option[(InvokerInstanceId, Boolean)] = ShardingContainerPoolBalancer.schedule(
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
invokersToUse,
schedulingState.invokerSlots,
action.limits.memory.megabytes,
homeInvoker,
stepSize)
invoker.foreach {
case (_, true) =>
val metric =
if (isBlackboxInvocation)
LoggingMarkers.BLACKBOX_SYSTEM_OVERLOAD
else
LoggingMarkers.MANAGED_SYSTEM_OVERLOAD
MetricEmitter.emitCounterMetric(metric)
case _ =>
}
invoker.map(_._1)
} else {
None
}
chosen
.map { invoker =>
// MemoryLimit() and TimeLimit() return singletons - they should be fast enough to be used here
val memoryLimit = action.limits.memory
val memoryLimitInfo = if (memoryLimit == MemoryLimit()) { "std" } else { "non-std" }
val timeLimit = action.limits.timeout
val timeLimitInfo = if (timeLimit == TimeLimit()) { "std" } else { "non-std" }
logging.info(
this,
s"scheduled activation ${msg.activationId}, action '${msg.action.asString}' ($actionType), ns '${msg.user.namespace.name.asString}', mem limit ${memoryLimit.megabytes} MB (${memoryLimitInfo}), time limit ${timeLimit.duration.toMillis} ms (${timeLimitInfo}) to ${invoker}")
val activationResult = setupActivation(msg, action, invoker)
sendActivationToInvoker(messageProducer, msg, invoker).map(_ => activationResult)
}
.getOrElse {
// report the state of all invokers
val invokerStates = invokersToUse.foldLeft(Map.empty[InvokerState, Int]) { (agg, curr) =>
val count = agg.getOrElse(curr.status, 0) + 1
agg + (curr.status -> count)
}
logging.error(
this,
s"failed to schedule activation ${msg.activationId}, action '${msg.action.asString}' ($actionType), ns '${msg.user.namespace.name.asString}' - invokers to use: $invokerStates")
Future.failed(LoadBalancerException("No invokers available"))
}
}