in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala [1044:1241]
private def initializeAndRunActivation(
container: Container,
clientProxy: ActorRef,
action: ExecutableWhiskAction,
msg: ActivationMessage,
resumeRun: Option[RunActivation] = None)(implicit tid: TransactionId): Future[WhiskActivation] = {
// Add the activation to runningActivations set
runningActivations.put(msg.activationId.asString, true)
val actionTimeout = action.limits.timeout.duration
val (env, parameters) = ContainerProxy.partitionArguments(msg.content, msg.initArgs)
val environment = Map(
"namespace" -> msg.user.namespace.name.toJson,
"action_name" -> msg.action.qualifiedNameWithLeadingSlash.toJson,
"action_version" -> msg.action.version.toJson,
"activation_id" -> msg.activationId.toString.toJson,
"transaction_id" -> msg.transid.id.toJson)
// if the action requests the api key to be injected into the action context, add it here;
// treat a missing annotation as requesting the api key for backward compatibility
val authEnvironment = {
if (action.annotations.isTruthy(Annotations.ProvideApiKeyAnnotationName, valueForNonExistent = true)) {
msg.user.authkey.toEnvironment.fields
} else Map.empty
}
// Only initialize iff we haven't yet warmed the container
val initialize = stateData match {
case _: WarmData =>
Future.successful(None)
case _ =>
val owEnv = (authEnvironment ++ environment ++ Map(
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)) map {
case (key, value) => "__OW_" + key.toUpperCase -> value
}
container
.initialize(action.containerInitializer(env ++ owEnv), actionTimeout, action.limits.concurrency.maxConcurrent)
.map(Some(_))
}
val activation: Future[WhiskActivation] = initialize
.flatMap { initInterval =>
// immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap
if (initInterval.isDefined) {
stateData match {
case _: InitializedData =>
self ! InitCodeCompleted(
WarmData(container, msg.user.namespace.name.asString, action, msg.revision, Instant.now, clientProxy))
case _ =>
Future.failed(new IllegalStateException("lease does not exist"))
}
}
val env = authEnvironment ++ environment ++ Map(
// compute deadline on invoker side avoids discrepancies inside container
// but potentially under-estimates actual deadline
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
container
.run(
parameters,
env.toJson.asJsObject,
actionTimeout,
action.limits.concurrency.maxConcurrent,
msg.user.limits.allowedMaxPayloadSize,
msg.user.limits.allowedTruncationSize,
resumeRun.isDefined)(msg.transid)
.map {
case (runInterval, response) =>
val initRunInterval = initInterval
.map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
.getOrElse(runInterval)
constructWhiskActivation(
action,
msg,
initInterval,
initRunInterval,
runInterval.duration >= actionTimeout,
response)
}
}
.recoverWith {
case h: ContainerHealthError if resumeRun.isDefined =>
// health error occurs
logging.error(this, s"caught healthchek check error while running activation")
Future.failed(ContainerHealthErrorWithResumedRun(h.tid, h.msg, resumeRun.get))
case InitializationError(interval, response) =>
Future.successful(
constructWhiskActivation(
action,
msg,
Some(interval),
interval,
interval.duration >= actionTimeout,
response))
case t =>
// Actually, this should never happen - but we want to make sure to not miss a problem
logging.error(this, s"caught unexpected error while running activation: $t")
Future.successful(
constructWhiskActivation(
action,
msg,
None,
Interval.zero,
false,
ExecutionResponse.whiskError(Messages.abnormalRun)))
}
val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(action)
// Sending an active ack is an asynchronous operation. The result is forwarded as soon as
// possible for blocking activations so that dependent activations can be scheduled. The
// completion message which frees a load balancer slot is sent after the active ack future
// completes to ensure proper ordering.
val sendResult = if (msg.blocking) {
activation.map { result =>
val ackMsg =
if (splitAckMessagesPendingLogCollection) ResultMessage(tid, result)
else CombinedCompletionAndResultMessage(tid, result, instance)
sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
}
} else {
// For non-blocking request, do not forward the result.
if (splitAckMessagesPendingLogCollection) Future.successful(())
else
activation.map { result =>
val ackMsg = CompletionMessage(tid, result, instance)
sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
}
}
activation.foreach { activation =>
val healthMessage = HealthMessage(!activation.response.isWhiskError)
invokerHealthManager ! healthMessage
}
val context = UserContext(msg.user)
// Adds logs to the raw activation.
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
.flatMap { activation =>
// Skips log collection entirely, if the limit is set to 0
if (action.limits.logs.asMegaBytes == 0.MB) {
Future.successful(Right(activation))
} else {
val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
collectLogs(tid, msg.user, activation, container, action)
.andThen {
case Success(_) => tid.finished(this, start)
case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
}
.map(logs => Right(activation.withLogs(logs)))
.recover {
case LogCollectingException(logs) =>
Left(ActivationLogReadingError(activation.withLogs(logs)))
case _ =>
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
}
}
}
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completion message to the controller after the active ack ensures proper ordering
// (result is received before the completion message for blocking invokes).
if (splitAckMessagesPendingLogCollection) {
sendResult.onComplete(
_ =>
sendActiveAck(
tid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CompletionMessage(tid, activation, instance)))
}
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, msg.blocking, context)
}
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs
.andThen {
// remove activationId from runningActivations in any case
case _ => runningActivations.remove(msg.activationId.asString)
}
.flatMap {
case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
case Right(act) => Future.successful(act)
}
}