app/utils/AkkaHttpHelpers.scala (69 lines of code) (raw):
package utils
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, ResponseEntity, Uri}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
/**
* This object can be imported into communicator classes, and supplies routines to buffer the response entity into
* memory and to parse/unmarshal it as JSON
*/
object AkkaHttpHelpers {
private val logger = LoggerFactory.getLogger(getClass)
sealed trait HttpUnsuccessActions
case class RedirectRequired(to:Uri) extends HttpUnsuccessActions
case object RetryRequired extends HttpUnsuccessActions
/**
* Internal method that consumes a given response entity to a String
*
* @param entity ResponseEntity object
* @return a Future containing the String of the content
*/
def consumeResponseEntity(entity: ResponseEntity)(implicit mat:Materializer, ec:ExecutionContext) = {
val sink = Sink.reduce[ByteString]((acc, elem) => acc ++ elem)
entity.dataBytes.toMat(sink)(Keep.right).run().map(_.utf8String)
}
def consumeStream(stream:Source[ByteString, Any])(implicit mat:Materializer, ec:ExecutionContext) = {
val sink = Sink.reduce[ByteString]((acc, elem) => acc ++ elem)
stream.toMat(sink)(Keep.right).run().map(_.utf8String)
}
/**
* Convenience method that consumes a given response entity and parses it into a Json object
*
* @param entity ResponseEntity object
* @return A Future containing either the ParsingFailure error or the parsed Json object
*/
def consumeResponseEntityJson(entity: ResponseEntity)(implicit mat:Materializer, ec:ExecutionContext) = consumeResponseEntity(entity)
.map(io.circe.parser.parse)
def contentBodyToJson[T: io.circe.Decoder](contentBody: Future[String])(implicit mat:Materializer, ec:ExecutionContext) = contentBody
.map(io.circe.parser.parse)
.map(_.map(json => (json \\ "result").headOption.getOrElse(json))) //If the actual object is hidden under a "result" field take that
.map(_.flatMap(_.as[T]))
.map({
case Left(err) =>
logger.error(s"Problematic response: ${contentBody.value}")
throw new RuntimeException("Could not understand server response: ", err)
case Right(data) => Some(data)
})
/**
* Handle the HTTP response from akka. This will return different actions to take depending on the response code, all returned in a Future:
* - 200 => Returns a Right, containing a ByteString source of the response body all contained in a Future
* - 404 => Returns a Right, containing None
* - 403/401 => Returns a failed Future with an error message
* - 400 => Returns a failed Future with an error message
* - 301 => Retrieves the Location header from the response and returns a Left with a RedirectRequired value giving the location to redirect to
* - 50x => Returns a Left with a RetryRequired value
* @param response The HttpResponse to process
* @param description Descriptive string of the remote component being talked to, for logging
* @param mat Implicitly provided Materializer
* @param ec Implicitly provided ExecutionContext
* @return A Future indicating the action to take.
*/
def handleResponse(response:HttpResponse, description:String) (implicit mat:Materializer, ec:ExecutionContext):Future[Either[HttpUnsuccessActions, Option[HttpEntity]]] = response.status.intValue() match {
case 200 =>
Future(Right(Some(response.entity)))
case 404 =>
response.entity.discardBytes()
Future(Right(None))
case 403|401 =>
response.entity.discardBytes()
Future.failed(new RuntimeException(s"$description said permission denied."))
case 409 =>
consumeResponseEntity(response.entity)
.flatMap(body => Future.failed(new RuntimeException(s"$description returned a conflict error: $body")))
case 400 =>
consumeResponseEntity(response.entity)
.flatMap(body => Future.failed(new RuntimeException(s"$description returned bad data error: $body")))
case 301 |302|303|308|309=>
response.entity.discardBytes()
logger.info(s"Received unexpected redirect from $description to ${response.getHeader("Location")}")
val h = response.getHeader("Location")
if (h.isPresent) {
Future(Left(RedirectRequired(h.get().value())))
} else {
Future.failed(new RuntimeException(s"$description returned an Unexpected redirect without location"))
}
case 500 | 502 | 503 | 504 =>
consumeResponseEntity(response.entity).map(body=>{
logger.error(s"$description returned a server error ${response.status}: $body. Retrying...")
Left(RetryRequired)
})
case _=>
consumeResponseEntity(response.entity)
.flatMap(body=>{
logger.error(s"Received unexpected response ${response.status} from $description, with content $body")
Future.failed(new RuntimeException(s"$description returned unexpected response: ${response.status}"))
})
}
}