in discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala [132:201]
override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
val labelSelector = settings.podLabelSelector(query.serviceName)
log.info(
"Querying for pods with label selector: [{}]. Namespace: [{}]. Port: [{}]",
labelSelector,
podNamespace,
query.portName)
for {
request <- optionToFuture(
podRequest(apiToken, podNamespace, labelSelector),
s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})")
response <- http.singleRequest(request, clientSslContext).map(decodeResponse)
entity <- response.entity.toStrict(resolveTimeout)
podList <- {
response.status match {
case StatusCodes.OK =>
log.debug("Kubernetes API entity: [{}]", entity.data.utf8String)
val unmarshalled = Unmarshal(entity).to[PodList]
unmarshalled.failed.foreach { t =>
log.warning(
"Failed to unmarshal Kubernetes API response. Status code: [{}]; Response body: [{}]. Ex: [{}]",
response.status.value,
entity,
t.getMessage)
}
unmarshalled
case StatusCodes.Forbidden =>
Unmarshal(entity).to[String].foreach { body =>
log.warning(
"Forbidden to communicate with Kubernetes API server; check RBAC settings. Response: [{}]",
body)
}
Future.failed(
new KubernetesApiException("Forbidden when communicating with the Kubernetes API. Check RBAC settings."))
case other =>
Unmarshal(entity).to[String].foreach { body =>
log.warning(
"Non-200 when communicating with Kubernetes API server. Status code: [{}]. Response body: [{}]",
other,
body)
}
Future.failed(new KubernetesApiException(s"Non-200 from Kubernetes API server: $other"))
}
}
} yield {
val addresses =
targets(podList, query.portName, podNamespace, settings.podDomain, settings.rawIp, settings.containerName)
if (addresses.isEmpty && podList.items.nonEmpty) {
if (log.isInfoEnabled) {
val containerPortNames = podList.items.flatMap(_.spec).flatMap(_.containers).flatMap(_.ports).flatten.toSet
log.info(
"No targets found from pod list. Is the correct port name configured? Current configuration: [{}]. Ports on pods: [{}]",
query.portName,
containerPortNames)
}
}
Resolved(
serviceName = query.serviceName,
addresses = addresses)
}
}