in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala [776:952]
def initializeAndRun(container: Container, job: Run, reschedule: Boolean = false)(
implicit tid: TransactionId): Future[WhiskActivation] = {
val actionTimeout = job.action.limits.timeout.duration
val unlockedArgs =
ContainerProxy.unlockArguments(job.msg.content, job.msg.lockedArgs, ParameterEncryption.singleton)
val (env, parameters) = ContainerProxy.partitionArguments(unlockedArgs, job.msg.initArgs)
val environment = Map(
"namespace" -> job.msg.user.namespace.name.toJson,
"action_name" -> job.msg.action.qualifiedNameWithLeadingSlash.toJson,
"action_version" -> job.msg.action.version.toJson,
"activation_id" -> job.msg.activationId.toString.toJson,
"transaction_id" -> job.msg.transid.id.toJson)
// if the action requests the api key to be injected into the action context, add it here;
// treat a missing annotation as requesting the api key for backward compatibility
val authEnvironment = {
if (job.action.annotations.isTruthy(Annotations.ProvideApiKeyAnnotationName, valueForNonExistent = true)) {
job.msg.user.authkey.toEnvironment.fields
} else Map.empty
}
// Only initialize iff we haven't yet warmed the container
val initialize = stateData match {
case data: WarmedData =>
Future.successful(None)
case _ =>
val owEnv = (authEnvironment ++ environment ++ Map(
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)) map {
case (key, value) => "__OW_" + key.toUpperCase -> value
}
container
.initialize(
job.action.containerInitializer(env ++ owEnv),
actionTimeout,
job.action.limits.concurrency.maxConcurrent,
Some(job.action.toWhiskAction))
.map(Some(_))
}
val activation: Future[WhiskActivation] = initialize
.flatMap { initInterval =>
//immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap
if (initInterval.isDefined) {
self ! InitCompleted(WarmedData(container, job.msg.user.namespace.name, job.action, Instant.now, 1))
}
val env = authEnvironment ++ environment ++ Map(
// compute deadline on invoker side avoids discrepancies inside container
// but potentially under-estimates actual deadline
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
container
.run(
parameters,
env.toJson.asJsObject,
actionTimeout,
job.action.limits.concurrency.maxConcurrent,
job.msg.user.limits.allowedMaxPayloadSize,
job.msg.user.limits.allowedTruncationSize,
reschedule)(job.msg.transid)
.map {
case (runInterval, response) =>
val initRunInterval = initInterval
.map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
.getOrElse(runInterval)
ContainerProxy.constructWhiskActivation(
job,
initInterval,
initRunInterval,
runInterval.duration >= actionTimeout,
response)
}
}
.recoverWith {
case h: ContainerHealthError =>
Future.failed(h)
case InitializationError(interval, response) =>
Future.successful(
ContainerProxy
.constructWhiskActivation(job, Some(interval), interval, interval.duration >= actionTimeout, response))
case t =>
// Actually, this should never happen - but we want to make sure to not miss a problem
logging.error(this, s"caught unexpected error while running activation: ${t}")
Future.successful(
ContainerProxy.constructWhiskActivation(
job,
None,
Interval.zero,
false,
ActivationResponse.whiskError(Messages.abnormalRun)))
}
val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(job.action)
// Sending an active ack is an asynchronous operation. The result is forwarded as soon as
// possible for blocking activations so that dependent activations can be scheduled. The
// completion message which frees a load balancer slot is sent after the active ack future
// completes to ensure proper ordering.
val sendResult = if (job.msg.blocking) {
activation.map { result =>
val msg =
if (splitAckMessagesPendingLogCollection) ResultMessage(tid, result)
else CombinedCompletionAndResultMessage(tid, result, instance)
sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
}
} else {
// For non-blocking request, do not forward the result.
if (splitAckMessagesPendingLogCollection) Future.successful(())
else
activation.map { result =>
val msg = CompletionMessage(tid, result, instance)
sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
}
}
val context = UserContext(job.msg.user)
// Adds logs to the raw activation.
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
.flatMap { activation =>
// Skips log collection entirely, if the limit is set to 0
if (!splitAckMessagesPendingLogCollection) {
Future.successful(Right(activation))
} else {
val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
collectLogs(tid, job.msg.user, activation, container, job.action)
.andThen {
case Success(_) => tid.finished(this, start)
case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
}
.map(logs => Right(activation.withLogs(logs)))
.recover {
case LogCollectingException(logs) =>
Left(ActivationLogReadingError(activation.withLogs(logs)))
case _ =>
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
}
}
}
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completion message to the controller after the active ack ensures proper ordering
// (result is received before the completion message for blocking invokes).
if (splitAckMessagesPendingLogCollection) {
sendResult.onComplete(
_ =>
sendActiveAck(
tid,
activation,
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid,
CompletionMessage(tid, activation, instance)))
}
storeActivation(tid, activation, job.msg.blocking, context)
}
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
case Right(act) if act.response.isSuccess || act.response.isApplicationError =>
if (act.response.isApplicationError && activationErrorLoggingConfig.applicationErrors) {
logTruncatedError(act)
}
Future.successful(act)
case Right(act) =>
if ((act.response.isContainerError && activationErrorLoggingConfig.developerErrors) ||
(act.response.isWhiskError && activationErrorLoggingConfig.whiskErrors)) {
logTruncatedError(act)
}
Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
}
}