private def handleActivationMessage()

in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala [903:997]


  private def handleActivationMessage(msg: ActivationMessage, action: ExecutableWhiskAction): Future[RunActivation] = {
    implicit val transid = msg.transid
    logging.info(this, s"received a message ${msg.activationId} for ${msg.action} in $stateName")
    if (!namespaceBlacklist.isBlacklisted(msg.user)) {
      logging.debug(this, s"namespace ${msg.user.namespace.name} is not in the namespaceBlacklist")
      val namespace = msg.action.path
      val name = msg.action.name
      val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
      val subject = msg.user.subject

      logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")

      // set trace context to continue tracing
      WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)

      // caching is enabled since actions have revision id and an updated
      // action will not hit in the cache due to change in the revision id;
      // if the doc revision is missing, then bypass cache
      if (actionid.rev == DocRevision.empty)
        logging.warn(this, s"revision was not provided for ${actionid.id}")

      get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty, false)
        .flatMap { action =>
          {
            // action that exceed the limit cannot be executed
            action.limits.checkLimits(msg.user)
            action.toExecutableWhiskAction match {
              case Some(executable) =>
                Future.successful(RunActivation(executable, msg))
              case None =>
                logging
                  .error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
                Future.failed(new IllegalStateException("non-executable action reached the invoker"))
            }
          }
        }
        .recoverWith {
          case DocumentRevisionMismatchException(_) =>
            // if revision is mismatched, the action may have been updated,
            // so try again with the latest code
            logging.warn(
              this,
              s"msg ${msg.activationId} for ${msg.action} in $stateName is updated, fetching latest code")
            handleActivationMessage(msg.copy(revision = DocRevision.empty), action)
          case t =>
            // If the action cannot be found, the user has concurrently deleted it,
            // making this an application error. All other errors are considered system
            // errors and should cause the invoker to be considered unhealthy.
            val response = t match {
              case _: NoDocumentException =>
                ExecutionResponse.applicationError(Messages.actionRemovedWhileInvoking)
              case e: ActionLimitsException =>
                ExecutionResponse.applicationError(e.getMessage) // return generated failed message
              case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
                ExecutionResponse.whiskError(Messages.actionMismatchWhileInvoking)
              case e: Throwable =>
                logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.")
                ExecutionResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
            }
            val errMsg = s"Error to fetch action ${msg.action} for msg ${msg.activationId}, error is ${t.getMessage}"
            logging.error(this, errMsg)

            val context = UserContext(msg.user)
            val activation = generateFallbackActivation(action, msg, response)
            sendActiveAck(
              transid,
              activation,
              msg.blocking,
              msg.rootControllerIndex,
              msg.user.namespace.uuid,
              CombinedCompletionAndResultMessage(transid, activation, instance))
            storeActivation(msg.transid, activation, msg.blocking, context)

            // in case action is removed container proxy should be terminated
            Future.failed(new IllegalStateException(errMsg))
        }
    } else {
      // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
      // Due to the protective nature of the blacklist, a database entry is not written.
      val activation =
        generateFallbackActivation(action, msg, ExecutionResponse.applicationError(Messages.namespacesBlacklisted))
      sendActiveAck(
        msg.transid,
        activation,
        false,
        msg.rootControllerIndex,
        msg.user.namespace.uuid,
        CombinedCompletionAndResultMessage(msg.transid, activation, instance))
      logging.warn(
        this,
        s"namespace ${msg.user.namespace.name} was blocked in containerProxy, complete msg ${msg.activationId} with error.")
      Future.failed(new IllegalStateException(s"namespace ${msg.user.namespace.name} was blocked in containerProxy."))
    }

  }