in common/src/main/scala/com/gu/multimedia/storagetier/plutocore/AssetFolderLookup.scala [30:100]
def callHttp = Http()
private val multiSlashRemover = "^/{2,}".r
/**
* internal method that performs a call to pluto, handles response codes/retries and unmarshals returned JSON to a domain object.
* If the server returns a 200 response then the content is parsed as JSON and unmarshalled into the given object
* If the server returns a 404 response then None is returned
* If the server returns a 403 or a 400 then a failed future is returned
* If the server returns a 500, 502, 503 or 504 then the request is retried after (attempt*0.5) seconds up till 10 attempts
*
* @param req constructed akka HttpRequest to perform
* @param attempt attempt counter, you don't need to specify this when calling
* @tparam T the type of domain object to unmarshal the response into. There must be an io.circe.Decoder in-scope for this kind of object.
* if the unmarshalling fails then a failed Future is returned
* @return a Future containing an Option with either the unmarshalled domain object or None
*/
protected def callToPluto[T: io.circe.Decoder](req: HttpRequest, attempt: Int = 1): Future[Option[T]] = if (attempt > 10) {
Future.failed(new RuntimeException("Too many retries, see logs for details"))
} else {
logger.debug(s"PlutoCore request URL is ${req.uri.toString()}")
val checksumBytes = MessageDigest.getInstance("SHA-384").digest("".getBytes)
val checksumString = checksumBytes.map("%02x".format(_)).mkString
val queryPart = req.uri.rawQueryString.map(query => "?" + query).getOrElse("")
val messageTime = ZonedDateTime.now()
val contentType = if (req.entity.isKnownEmpty()) {
""
} else {
req.entity.contentType.mediaType.toString()
}
val token = HMAC.calculateHmac(
contentType,
checksumString,
method = req.method.value,
multiSlashRemover.replaceAllIn(req.uri.path.toString(), "/") + queryPart,
config.sharedSecret,
messageTime
)
if (token.isEmpty) {
Future.failed(new RuntimeException("could not build authorization"))
} else {
val auth: HttpHeader = RawHeader("Authorization", s"HMAC ${token.get}")
val checksum = RawHeader("Digest", s"SHA-384=$checksumString")
val date = RawHeader("Date", DateTimeFormatter.RFC_1123_DATE_TIME.format(messageTime))
val updatedReq = req.withHeaders(scala.collection.immutable.Seq(auth, date, checksum)) //add in the authorization header
val loggerContext = Option(MDC.getCopyOfContextMap)
callHttp
.singleRequest(updatedReq)
.flatMap(response => {
if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get)
AkkaHttpHelpers.handleResponse(response, "PlutoCore")
})
.flatMap({
case Right(Some(stream))=>contentBodyToJson(consumeStream(stream.dataBytes))
case Right(None)=>Future(None)
case Left(RedirectRequired(newUri))=>
logger.info(s"Redirecting to $newUri")
callToPluto(req.withUri(newUri), attempt + 1)
case Left(RetryRequired)=>
Thread.sleep(500 * attempt)
callToPluto(req, attempt + 1)
})
}
}