in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala [903:997]
private def handleActivationMessage(msg: ActivationMessage, action: ExecutableWhiskAction): Future[RunActivation] = {
implicit val transid = msg.transid
logging.info(this, s"received a message ${msg.activationId} for ${msg.action} in $stateName")
if (!namespaceBlacklist.isBlacklisted(msg.user)) {
logging.debug(this, s"namespace ${msg.user.namespace.name} is not in the namespaceBlacklist")
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject
logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
// set trace context to continue tracing
WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty)
logging.warn(this, s"revision was not provided for ${actionid.id}")
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty, false)
.flatMap { action =>
{
// action that exceed the limit cannot be executed
action.limits.checkLimits(msg.user)
action.toExecutableWhiskAction match {
case Some(executable) =>
Future.successful(RunActivation(executable, msg))
case None =>
logging
.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
}
}
.recoverWith {
case DocumentRevisionMismatchException(_) =>
// if revision is mismatched, the action may have been updated,
// so try again with the latest code
logging.warn(
this,
s"msg ${msg.activationId} for ${msg.action} in $stateName is updated, fetching latest code")
handleActivationMessage(msg.copy(revision = DocRevision.empty), action)
case t =>
// If the action cannot be found, the user has concurrently deleted it,
// making this an application error. All other errors are considered system
// errors and should cause the invoker to be considered unhealthy.
val response = t match {
case _: NoDocumentException =>
ExecutionResponse.applicationError(Messages.actionRemovedWhileInvoking)
case e: ActionLimitsException =>
ExecutionResponse.applicationError(e.getMessage) // return generated failed message
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ExecutionResponse.whiskError(Messages.actionMismatchWhileInvoking)
case e: Throwable =>
logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.")
ExecutionResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
val errMsg = s"Error to fetch action ${msg.action} for msg ${msg.activationId}, error is ${t.getMessage}"
logging.error(this, errMsg)
val context = UserContext(msg.user)
val activation = generateFallbackActivation(action, msg, response)
sendActiveAck(
transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
storeActivation(msg.transid, activation, msg.blocking, context)
// in case action is removed container proxy should be terminated
Future.failed(new IllegalStateException(errMsg))
}
} else {
// Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry is not written.
val activation =
generateFallbackActivation(action, msg, ExecutionResponse.applicationError(Messages.namespacesBlacklisted))
sendActiveAck(
msg.transid,
activation,
false,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(msg.transid, activation, instance))
logging.warn(
this,
s"namespace ${msg.user.namespace.name} was blocked in containerProxy, complete msg ${msg.activationId} with error.")
Future.failed(new IllegalStateException(s"namespace ${msg.user.namespace.name} was blocked in containerProxy."))
}
}