in core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala [260:370]
protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry): Unit
// Singletons for counter metrics related to completion acks
protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_HEALTHCHECK =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, HealthcheckCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck)
/** 6. Process the completion ack and update the state */
protected[loadBalancer] def processCompletion(aid: ActivationId,
tid: TransactionId,
forced: Boolean,
isSystemError: Boolean,
instance: InstanceId): Unit = {
val invoker = instance match {
case i: InvokerInstanceId => Some(i)
case _ => None
}
val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
} else {
// If the response contains a system error, report that, otherwise report Success
// Left generally is considered a Success, since that could be a message not fitting into Kafka
if (isSystemError) {
InvocationFinishedResult.SystemError
} else {
InvocationFinishedResult.Success
}
}
activationSlots.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
val totalActivationMemory =
if (entry.isBlackbox) totalBlackBoxActivationMemory else totalManagedActivationMemory
totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
activationsPerController.get(entry.controllerId).foreach(_.decrement())
activationsPerInvoker
.get(InvokerInstanceId(entry.invokerName.instance, userMemory = 0.MB))
.foreach(_.decrement())
invoker.foreach(releaseInvoker(_, entry))
if (!forced) {
entry.timeoutHandler.cancel()
// notice here that the activationPromises is not touched, because the expectation is that
// the active ack is received as expected, and processing that message removed the promise
// from the corresponding map
logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
} else {
// the entry has timed out; if the active ack is still around, remove its entry also
// and complete the promise with a failure if necessary
activationPromises
.remove(aid)
.foreach(_.tryFailure(new Throwable("no completion or active ack received yet")))
val actionType = if (entry.isBlackbox) "blackbox" else "managed"
val blockingType = if (entry.isBlocking) "blocking" else "non-blocking"
val completionAckTimeout = calculateCompletionAckTimeout(entry.timeLimit)
logging.warn(
this,
s"forced completion ack for '$aid', action '${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit ${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, completion ack timeout $completionAckTimeout from $instance")(
tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
}
// Completion acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
// guard this
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received completion ack for health action on $instance")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
// guard this
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
case None if !forced =>
// Received a completion ack that has already been taken out of the state because of a timeout (forced ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
// Logging this condition as a warning because the invoker processed the activation and sent a completion
// message - but not in time.
logging.warn(
this,
s"received completion ack for '$aid' from $instance which has no entry, system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
case None =>
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
}
}