in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala [190:266]
override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
//track the resuming run for easily referring to the action being resumed (it may fail and be resent)
def withoutResumeRun() = this.copy(resumeRun = None)
def withResumeRun(job: Run) = this.copy(resumeRun = Some(job))
}
// Events received by the actor
case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration] = None)
case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
case object Remove
case class HealthPingEnabled(enabled: Boolean)
// Events sent by the actor
case class NeedWork(data: ContainerData)
case object ContainerPaused
case class ContainerRemoved(replacePrewarm: Boolean) // when container is destroyed
case object RescheduleJob // job is sent back to parent and could not be processed because container is being destroyed
case class PreWarmCompleted(data: PreWarmedData)
case class InitCompleted(data: WarmedData)
case object RunCompleted
/**
* A proxy that wraps a Container. It is used to keep track of the lifecycle
* of a container and to guarantee a contract between the client of the container
* and the container itself.
*
* The contract is as follows:
* 1. If action.limits.concurrency.maxConcurrent == 1:
* Only one job is to be sent to the ContainerProxy at one time. ContainerProxy
* will delay all further jobs until a previous job has finished.
*
* 1a. The next job can be sent to the ContainerProxy after it indicates available
* capacity by sending NeedWork to its parent.
*
* 2. If action.limits.concurrency.maxConcurrent > 1:
* Parent must coordinate with ContainerProxy to attempt to send only data.action.limits.concurrency.maxConcurrent
* jobs for concurrent processing.
*
* Since the current job count is only periodically sent to parent, the number of jobs
* sent to ContainerProxy may exceed data.action.limits.concurrency.maxConcurrent,
* in which case jobs are buffered, so that only a max of action.limits.concurrency.maxConcurrent
* are ever sent into the container concurrently. Parent will NOT be signalled to send more jobs until
* buffered jobs are completed, but their order is not guaranteed.
*
* 2a. The next job can be sent to the ContainerProxy after ContainerProxy has "concurrent capacity",
* indicated by sending NeedWork to its parent.
*
* 3. A Remove message can be sent at any point in time. Like multiple jobs though,
* it will be delayed until the currently running job finishes.
*
* @constructor
* @param factory a function generating a Container
* @param sendActiveAck a function sending the activation via active ack
* @param storeActivation a function storing the activation in a persistent store
* @param unusedTimeout time after which the container is automatically thrown away
* @param pauseGrace time to wait for new work before pausing the container
*/
class ContainerProxy(factory: (TransactionId,
String,
ImageName,
Boolean,
ByteSize,
Int,
Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
collectLogs: LogsCollector,
instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
healtCheckConfig: ContainerProxyHealthCheckConfig,
activationErrorLoggingConfig: ContainerProxyActivationErrorLogConfig,
unusedTimeout: FiniteDuration,
pauseGrace: FiniteDuration,
testTcp: Option[ActorRef])
extends FSM[ContainerState, ContainerData]
with Stash {