in core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala [107:164]
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
val topicBaseName = if (schedulerEndpoints.isEmpty) {
logging.error(
this,
s"Failed to invoke action ${action.fullyQualifiedName(false)}, error: no scheduler endpoint available")
Future.failed(LoadBalancerException("No scheduler endpoint available"))
} else {
val invocationNamespace = msg.user.namespace.name.asString
val key = QueueKeys.queue(invocationNamespace, action.fullyQualifiedName(false), true)
queueEndpoints.get(key) match {
case Some(endPoint) =>
Future.successful(
schedulerEndpoints.getOrElse(endPoint, Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
case None =>
etcdClient
.get(key)
.map { res =>
res.getKvsList.asScala.headOption map { kv =>
val endPoint: String = kv.getValue
SchedulerEndpoints
.parse(endPoint)
.map { endPoint =>
queueEndpoints.update(kv.getKey, endPoint)
Some(
schedulerEndpoints
.getOrElse(endPoint, Random.shuffle(schedulerEndpoints.toList).head._2)
.sid
.toString)
}
.getOrElse {
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler =>
createQueue(invocationNamespace, action.toWhiskAction, msg.action, msg.revision, scheduler)
scheduler.sid.toString
}
}
} getOrElse {
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map { scheduler =>
createQueue(invocationNamespace, action.toWhiskAction, msg.action, msg.revision, scheduler)
scheduler.sid.toString
}
}
}
.map { _.get }
.recoverWith {
case _ =>
Future.failed(LoadBalancerException("No scheduler endpoint available"))
}
}
}
topicBaseName.flatMap { baseName =>
val topicName = Controller.topicPrefix + baseName
val activationResult = setupActivation(msg, action)
sendActivationToKafka(messageProducer, msg, topicName).map(_ => activationResult)
}
}