private def initializeAndRunActivation()

in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala [1044:1241]


  private def initializeAndRunActivation(
    container: Container,
    clientProxy: ActorRef,
    action: ExecutableWhiskAction,
    msg: ActivationMessage,
    resumeRun: Option[RunActivation] = None)(implicit tid: TransactionId): Future[WhiskActivation] = {
    // Add the activation to runningActivations set
    runningActivations.put(msg.activationId.asString, true)

    val actionTimeout = action.limits.timeout.duration

    val (env, parameters) = ContainerProxy.partitionArguments(msg.content, msg.initArgs)

    val environment = Map(
      "namespace" -> msg.user.namespace.name.toJson,
      "action_name" -> msg.action.qualifiedNameWithLeadingSlash.toJson,
      "action_version" -> msg.action.version.toJson,
      "activation_id" -> msg.activationId.toString.toJson,
      "transaction_id" -> msg.transid.id.toJson)

    // if the action requests the api key to be injected into the action context, add it here;
    // treat a missing annotation as requesting the api key for backward compatibility
    val authEnvironment = {
      if (action.annotations.isTruthy(Annotations.ProvideApiKeyAnnotationName, valueForNonExistent = true)) {
        msg.user.authkey.toEnvironment.fields
      } else Map.empty
    }

    // Only initialize iff we haven't yet warmed the container
    val initialize = stateData match {
      case _: WarmData =>
        Future.successful(None)
      case _ =>
        val owEnv = (authEnvironment ++ environment ++ Map(
          "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)) map {
          case (key, value) => "__OW_" + key.toUpperCase -> value
        }
        container
          .initialize(action.containerInitializer(env ++ owEnv), actionTimeout, action.limits.concurrency.maxConcurrent)
          .map(Some(_))
    }

    val activation: Future[WhiskActivation] = initialize
      .flatMap { initInterval =>
        // immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap
        if (initInterval.isDefined) {
          stateData match {
            case _: InitializedData =>
              self ! InitCodeCompleted(
                WarmData(container, msg.user.namespace.name.asString, action, msg.revision, Instant.now, clientProxy))

            case _ =>
              Future.failed(new IllegalStateException("lease does not exist"))
          }
        }
        val env = authEnvironment ++ environment ++ Map(
          // compute deadline on invoker side avoids discrepancies inside container
          // but potentially under-estimates actual deadline
          "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)

        container
          .run(
            parameters,
            env.toJson.asJsObject,
            actionTimeout,
            action.limits.concurrency.maxConcurrent,
            msg.user.limits.allowedMaxPayloadSize,
            msg.user.limits.allowedTruncationSize,
            resumeRun.isDefined)(msg.transid)
          .map {
            case (runInterval, response) =>
              val initRunInterval = initInterval
                .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
                .getOrElse(runInterval)
              constructWhiskActivation(
                action,
                msg,
                initInterval,
                initRunInterval,
                runInterval.duration >= actionTimeout,
                response)
          }
      }
      .recoverWith {
        case h: ContainerHealthError if resumeRun.isDefined =>
          // health error occurs
          logging.error(this, s"caught healthchek check error while running activation")
          Future.failed(ContainerHealthErrorWithResumedRun(h.tid, h.msg, resumeRun.get))

        case InitializationError(interval, response) =>
          Future.successful(
            constructWhiskActivation(
              action,
              msg,
              Some(interval),
              interval,
              interval.duration >= actionTimeout,
              response))

        case t =>
          // Actually, this should never happen - but we want to make sure to not miss a problem
          logging.error(this, s"caught unexpected error while running activation: $t")
          Future.successful(
            constructWhiskActivation(
              action,
              msg,
              None,
              Interval.zero,
              false,
              ExecutionResponse.whiskError(Messages.abnormalRun)))
      }

    val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(action)
    // Sending an active ack is an asynchronous operation. The result is forwarded as soon as
    // possible for blocking activations so that dependent activations can be scheduled. The
    // completion message which frees a load balancer slot is sent after the active ack future
    // completes to ensure proper ordering.
    val sendResult = if (msg.blocking) {
      activation.map { result =>
        val ackMsg =
          if (splitAckMessagesPendingLogCollection) ResultMessage(tid, result)
          else CombinedCompletionAndResultMessage(tid, result, instance)
        sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
      }
    } else {
      // For non-blocking request, do not forward the result.
      if (splitAckMessagesPendingLogCollection) Future.successful(())
      else
        activation.map { result =>
          val ackMsg = CompletionMessage(tid, result, instance)
          sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
        }
    }

    activation.foreach { activation =>
      val healthMessage = HealthMessage(!activation.response.isWhiskError)
      invokerHealthManager ! healthMessage
    }

    val context = UserContext(msg.user)

    // Adds logs to the raw activation.
    val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
      .flatMap { activation =>
        // Skips log collection entirely, if the limit is set to 0
        if (action.limits.logs.asMegaBytes == 0.MB) {
          Future.successful(Right(activation))
        } else {
          val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
          collectLogs(tid, msg.user, activation, container, action)
            .andThen {
              case Success(_) => tid.finished(this, start)
              case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
            }
            .map(logs => Right(activation.withLogs(logs)))
            .recover {
              case LogCollectingException(logs) =>
                Left(ActivationLogReadingError(activation.withLogs(logs)))
              case _ =>
                Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
            }
        }
      }

    activationWithLogs
      .map(_.fold(_.activation, identity))
      .foreach { activation =>
        // Sending the completion message to the controller after the active ack ensures proper ordering
        // (result is received before the completion message for blocking invokes).
        if (splitAckMessagesPendingLogCollection) {
          sendResult.onComplete(
            _ =>
              sendActiveAck(
                tid,
                activation,
                msg.blocking,
                msg.rootControllerIndex,
                msg.user.namespace.uuid,
                CompletionMessage(tid, activation, instance)))
        }

        // Storing the record. Entirely asynchronous and not waited upon.
        storeActivation(tid, activation, msg.blocking, context)
      }

    // Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
    activationWithLogs
      .andThen {
        // remove activationId from runningActivations in any case
        case _ => runningActivations.remove(msg.activationId.asString)
      }
      .flatMap {
        case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
          Future.failed(ActivationUnsuccessfulError(act))
        case Left(error) => Future.failed(error)
        case Right(act)  => Future.successful(act)
      }
  }