in lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala [54:97]
override def updateLeaseResource(
leaseName: String,
ownerName: String,
version: String,
time: Long = System.currentTimeMillis()): Future[Either[LeaseResource, LeaseResource]] = {
val lcr = LeaseCustomResource(Metadata(leaseName, Some(version)), Spec(ownerName, System.currentTimeMillis()))
for {
entity <- Marshal(lcr).to[RequestEntity]
response <- {
log.debug("updating {} to {}", leaseName, lcr)
makeRequest(
requestForPath(pathForLease(leaseName), method = HttpMethods.PUT, entity),
s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is not known if the update happened")
}
result <- response.status match {
case StatusCodes.OK =>
Unmarshal(response.entity)
.to[LeaseCustomResource]
.map(updatedLcr => {
log.debug("LCR after update: {}", updatedLcr)
Right(toLeaseResource(updatedLcr))
})
case StatusCodes.Conflict =>
getLeaseResource(leaseName).flatMap {
case None =>
Future.failed(
new LeaseException(s"GET after PUT conflict did not return a lease. Lease[$leaseName-$ownerName]"))
case Some(lr) =>
log.debug("LeaseResource read after conflict: {}", lr)
Future.successful(Left(lr))
}
case StatusCodes.Unauthorized =>
handleUnauthorized(response)
case unexpected =>
Unmarshal(response.entity)
.to[String]
.flatMap(body => {
Future.failed(
new LeaseException(
s"PUT for lease $leaseName returned unexpected status code $unexpected. Body: $body"))
})
}
} yield result
}