override def publish()

in core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala [107:164]


  override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {

    val topicBaseName = if (schedulerEndpoints.isEmpty) {
      logging.error(
        this,
        s"Failed to invoke action ${action.fullyQualifiedName(false)}, error: no scheduler endpoint available")
      Future.failed(LoadBalancerException("No scheduler endpoint available"))
    } else {
      val invocationNamespace = msg.user.namespace.name.asString
      val key = QueueKeys.queue(invocationNamespace, action.fullyQualifiedName(false), true)

      queueEndpoints.get(key) match {
        case Some(endPoint) =>
          Future.successful(
            schedulerEndpoints.getOrElse(endPoint, Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
        case None =>
          etcdClient
            .get(key)
            .map { res =>
              res.getKvsList.asScala.headOption map { kv =>
                val endPoint: String = kv.getValue
                SchedulerEndpoints
                  .parse(endPoint)
                  .map { endPoint =>
                    queueEndpoints.update(kv.getKey, endPoint)
                    Some(
                      schedulerEndpoints
                        .getOrElse(endPoint, Random.shuffle(schedulerEndpoints.toList).head._2)
                        .sid
                        .toString)
                  }
                  .getOrElse {
                    FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler =>
                      createQueue(invocationNamespace, action.toWhiskAction, msg.action, msg.revision, scheduler)
                      scheduler.sid.toString
                    }
                  }
              } getOrElse {
                FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler =>
                  createQueue(invocationNamespace, action.toWhiskAction, msg.action, msg.revision, scheduler)
                  scheduler.sid.toString
                }
              }
            }
            .map { _.get }
            .recoverWith {
              case _ =>
                Future.failed(LoadBalancerException("No scheduler endpoint available"))
            }
      }
    }
    topicBaseName.flatMap { baseName =>
      val topicName = Controller.topicPrefix + baseName
      val activationResult = setupActivation(msg, action)
      sendActivationToKafka(messageProducer, msg, topicName).map(_ => activationResult)
    }
  }