private[queue] def decide()

in core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala [49:259]


  private[queue] def decide(snapshot: QueueSnapshot) = {
    val QueueSnapshot(
      initialized,
      incoming,
      currentMsg,
      existing,
      inProgress,
      staleActivationNum,
      existingContainerCountInNs,
      inProgressContainerCountInNs,
      averageDuration,
      namespaceLimit,
      actionLimit,
      maxActionConcurrency,
      stateName,
      _) = snapshot
    val totalContainers = existing + inProgress
    val availableMsg = currentMsg + incoming.get()
    val actionCapacity = actionLimit - totalContainers
    val namespaceCapacity = namespaceLimit - existingContainerCountInNs - inProgressContainerCountInNs
    val overProvisionCapacity = ceiling(namespaceLimit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs

    if (Math.min(namespaceLimit, actionLimit) <= 0) {
      // this is an error case, the limit should be bigger than 0
      stateName match {
        case Flushing => Future.successful(DecisionResults(Skip, 0))
        case _        => Future.successful(DecisionResults(Pausing, 0))
      }
    } else {
      val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) {
        // if space available within the over provision ratio amount above namespace limit, create one container for new
        // action so namespace traffic can attempt to re-balance without blocking entire action
        if (overProvisionCapacity > 0) {
          1
        } else {
          0
        }
      } else {
        Math.min(namespaceCapacity, actionCapacity)
      }

      if (capacity <= 0) {
        stateName match {

          /**
           * If the container is created later (for any reason), all activations fail(too many requests).
           *
           * However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container.
           */
          case Running
              if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity <= 0) =>
            logging.info(
              this,
              s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, actionLimit: $actionLimit, namespaceLimit: $namespaceLimit, namespaceContainers: $existingContainerCountInNs, namespaceInProgressContainer: $inProgressContainerCountInNs) [$invocationNamespace:$action]")
            Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
          case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && overProvisionCapacity > 0 =>
            Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
          // do nothing
          case _ =>
            // no need to print any messages if the state is already NamespaceThrottled
            Future.successful(DecisionResults(Skip, 0))
        }
      } else {
        (stateName, averageDuration) match {
          // there is no container
          case (Running, None) if totalContainers == 0 && !initialized =>
            logging.info(
              this,
              s"add one initial container if totalContainers($totalContainers) == 0 [$invocationNamespace:$action]")
            Future.successful(DecisionResults(AddInitialContainer, 1))

          // Todo: when disabling throttling we may create some containers.
          case (NamespaceThrottled, _) =>
            Future.successful(DecisionResults(DisableNamespaceThrottling, 0))

          // this is an exceptional case, create a container immediately
          case (Running, _) if totalContainers == 0 && availableMsg > 0 =>
            logging.info(
              this,
              s"add one container if totalContainers($totalContainers) == 0 && availableMsg($availableMsg) > 0 [$invocationNamespace:$action]")
            Future.successful(DecisionResults(AddContainer, 1))

          case (Flushing, _) if totalContainers == 0 =>
            logging.info(
              this,
              s"add one container case Paused if totalContainers($totalContainers) == 0 [$invocationNamespace:$action]")
            // it is highly likely the queue could not create an initial container if the limit is 0
            Future.successful(DecisionResults(AddInitialContainer, 1))

          // there is no activation result yet, but some activations became stale
          // it may cause some over-provisioning if it takes much time to create a container and execution time is short.
          // but it is a kind of trade-off and we place latency on top of over-provisioning
          case (Running, None) if staleActivationNum > 0 =>
            // we can safely get the value as we already checked the existence
            val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
            // if it tries to create more containers than existing messages, we just create shortage
            val actualNum = if (num > availableMsg) availableMsg else num
            addServersIfPossible(
              existing,
              inProgress,
              0,
              availableMsg,
              capacity,
              namespaceCapacity,
              actualNum,
              staleActivationNum,
              0.0,
              Running)
          // need more containers and a message is already processed
          case (Running, Some(duration)) =>
            // we can safely get the value as we already checked the existence, have extra protection in case duration is somehow negative
            val containerThroughput = if (duration <= 0) {
              maxActionConcurrency
            } else {
              (staleThreshold / duration) * maxActionConcurrency
            }
            val expectedTps = containerThroughput * (existing + inProgress)
            val availableNonStaleActivations = availableMsg - staleActivationNum

            var staleContainerProvision = 0
            if (staleActivationNum > 0) {

              val num = ceiling(staleActivationNum.toDouble / containerThroughput)
              // if it tries to create more containers than existing messages, we just create shortage
              staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
            }

            if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations && duration > 0) {
              val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
              // if it tries to create more containers than existing messages, we just create shortage
              val actualNum =
                if (num + totalContainers > availableNonStaleActivations) availableNonStaleActivations - totalContainers
                else num
              addServersIfPossible(
                existing,
                inProgress,
                containerThroughput,
                availableMsg,
                capacity,
                namespaceCapacity,
                actualNum + staleContainerProvision,
                staleActivationNum,
                duration,
                Running)
            } else if (staleContainerProvision > 0) {
              addServersIfPossible(
                existing,
                inProgress,
                containerThroughput,
                availableMsg,
                capacity,
                namespaceCapacity,
                staleContainerProvision,
                staleActivationNum,
                duration,
                Running)
            } else {
              Future.successful(DecisionResults(Skip, 0))
            }

          // generally we assume there are enough containers for actions when shutting down the scheduler
          // but if there were already too many activation in the queue with not enough containers,
          // we should add more containers to quickly consume those messages.
          // this case is for that as a last resort.
          case (Removing, Some(duration)) if staleActivationNum > 0 =>
            // we can safely get the value as we already checked the existence
            val containerThroughput = if (duration <= 0) {
              maxActionConcurrency
            } else {
              (staleThreshold / duration) * maxActionConcurrency
            }
            val num = ceiling(staleActivationNum.toDouble / containerThroughput)
            // if it tries to create more containers than existing messages, we just create shortage
            val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
            addServersIfPossible(
              existing,
              inProgress,
              containerThroughput,
              availableMsg,
              capacity,
              namespaceCapacity,
              actualNum,
              staleActivationNum,
              duration,
              Running)

          // same with the above case but no duration exist.
          case (Removing, None) if staleActivationNum > 0 =>
            // we can safely get the value as we already checked the existence
            val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
            // if it tries to create more containers than existing messages, we just create shortage
            val actualNum = if (num > availableMsg) availableMsg else num
            addServersIfPossible(
              existing,
              inProgress,
              0,
              availableMsg,
              capacity,
              namespaceCapacity,
              actualNum,
              staleActivationNum,
              0.0,
              Running)

          // do nothing
          case _ =>
            Future.successful(DecisionResults(Skip, 0))
        }
      }
    }
  }