hq/app/utils/attempt/Attempt.scala (123 lines of code) (raw):
package utils.attempt
import java.util.{Timer, TimerTask}
import play.api.Logging
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
/**
* Represents a value that will need to be calculated using an asynchronous
* computation that may fail.
*/
case class Attempt[A] private (underlying: Future[Either[FailedAttempt, A]]) extends Logging {
def map[B](f: A => B)(implicit ec: ExecutionContext): Attempt[B] =
flatMap(a => Attempt.Right(f(a)))
def flatMap[B](f: A => Attempt[B])(implicit ec: ExecutionContext): Attempt[B] = Attempt {
asFuture.flatMap {
case Right(a) => f(a).asFuture
case Left(e) => Future.successful(Left(e))
}
}
def fold[B](failure: FailedAttempt => B, success: A => B)(implicit ec: ExecutionContext): Future[B] = {
asFuture.map(_.fold(failure, success))
}
def map2[B, C](bAttempt: Attempt[B])(f: (A, B) => C)(implicit ec: ExecutionContext): Attempt[C] = {
for {
a <- this
b <- bAttempt
} yield f(a, b)
}
/**
* Perform a side effect when the attempt completes, while still passing the value through.
*
* This is most useful for logging at the edge of a program that needs to return an Attempt.
*/
def tap(sideEffect: Either[FailedAttempt, A] => Unit)(implicit ec: ExecutionContext): Attempt[A] = {
asFuture.foreach(sideEffect)
this
}
/**
* Discards the result type and returns unit if this attempt succeeded.
*/
def unit(implicit ec: ExecutionContext): Attempt[Unit] = {
map(_ => ())
}
/**
* If there is an error in the Future itself (e.g. a timeout) we convert it to a
* Left so we have a consistent error representation. Unfortunately, this means
* the error isn't being handled properly so we're left with just the information
* provided by the exception.
*
* Try to avoid hitting this method's failure case by always handling Future errors
* and creating a suitable failure instance for the problem.
*/
def asFuture(implicit ec: ExecutionContext): Future[Either[FailedAttempt, A]] = {
underlying recover { case err =>
val apiErrors = FailedAttempt(Failure(err.getMessage, "Unexpected error", 500, throwable = Some(err)))
logger.error(apiErrors.logMessage, apiErrors.firstException.orNull)
scala.Left(apiErrors)
}
}
def delay(delay: FiniteDuration)(implicit ec: ExecutionContext): Attempt[A] = {
flatMap(Attempt.Delayed(delay)(_))
}
}
object Attempt {
/**
* Changes generated `List[Attempt[A]]` to `Attempt[List[A]]` via provided
* traversal function (like `Future.traverse`).
*
* This implementation returns the first failure in the resulting list,
* or the successful result.
*/
def traverse[A, B](as: List[A])(f: A => Attempt[B])(implicit ec: ExecutionContext): Attempt[List[B]] = {
as.foldRight[Attempt[List[B]]](Right(Nil))(f(_).map2(_)(_ :: _))
}
/** Using the provided traversal function, sequence the resulting attempts
* to `Attempt[List[(A, B)]]` where the first element of each tuple is the
* value passed to the function f in order to to generate the second element
*
* This implementation returns the first failure in the resulting list,
* or the successful result.
*/
def labelledTraverse[A, B](as: List[A])(f: A => Attempt[B])(implicit ec: ExecutionContext): Attempt[List[(A, B)]] = {
Attempt.traverse(as)(a => f(a).map((a, _)))
}
/** Traverses the given list `List[A]` with the function f, A => Attempt[List[B]]` and flattens the generated result into Attempt[List[B]]`
* This implementation returns the first failure in the resulting list,
* or the successful result.
*/
def flatTraverse[A, B](as: List[A])(f: A => Attempt[List[B]])(implicit ec: ExecutionContext): Attempt[List[B]] = {
Attempt.traverse(as)(f).map(_.flatten)
}
/**
* Using the provided traversal function, sequence the resulting attempts
* into a list that preserves failures.
*
* This is useful if failure is acceptable in part of the application.
*/
def traverseWithFailures[A, B](as: List[A])(f: A => Attempt[B])(implicit ec: ExecutionContext): Attempt[List[Either[FailedAttempt, B]]] = {
sequenceWithFailures(as.map(f))
}
/**
* Using the provided traversal function, sequence the resulting attempts
* into a list that:
* - preserves failures (by keeping the resulting Either)
* - creates a tuple who's first element is the value passed to the traversal function f
*
* Combines the behaviours of labelledTraverse and traverseWithFailures.
*/
def labelledTraverseWithFailures[A, B](as: List[A])(f: A => Attempt[B])(implicit ec: ExecutionContext): Attempt[List[(A, Either[FailedAttempt, B])]] = {
Async.Right(Future.traverse(as)(a => f(a).asFuture.map(a -> _)))
}
/**
* Flattens the given list `List[Attempt[List[A]]]` to `Attempt[List[A]]`.
*/
def flatSequence[A](as: List[Attempt[List[A]]])(implicit ec: ExecutionContext): Attempt[List[A]] = {
flatTraverse(as)(identity)
}
/**
* As with `Future.sequence`, changes `List[Attempt[A]]` to `Attempt[List[A]]`.
*
* This implementation returns the first failure in the list, or the successful result.
*/
def sequence[A](responses: List[Attempt[A]])(implicit ec: ExecutionContext): Attempt[List[A]] = {
traverse(responses)(identity)
}
/**
* Sequence these attempts into a list that preserves failures.
*
* This is useful if failure is acceptable in part of the application.
*/
def sequenceWithFailures[A](attempts: List[Attempt[A]])(implicit ec: ExecutionContext): Attempt[List[Either[FailedAttempt, A]]] = {
Async.Right(Future.traverse(attempts)(_.asFuture))
}
def fromEither[A](e: Either[FailedAttempt, A]): Attempt[A] =
Attempt(Future.successful(e))
def fromOption[A](optA: Option[A], ifNone: FailedAttempt): Attempt[A] =
fromEither(optA.toRight(ifNone))
/**
* Convert a plain `Future` value to an attempt by providing a recovery handler.
*/
def fromFuture[A](future: Future[A])(recovery: PartialFunction[Throwable, FailedAttempt])(implicit ec: ExecutionContext): Attempt[A] = {
Attempt {
future
.map(scala.Right(_))
.recover { case t =>
scala.Left(recovery(t))
}
}
}
/**
* Discard failures from a list of attempts.
*
* **Use with caution**.
*/
def successfulAttempts[A](attempts: List[Attempt[A]])(implicit ec: ExecutionContext): Attempt[List[A]] = {
Attempt.Async.Right {
Future.traverse(attempts)(_.asFuture).map(_.collect { case Right(a) => a })
}
}
/**
* Create an Attempt instance from a "good" value.
*/
def Right[A](a: A): Attempt[A] =
Attempt(Future.successful(scala.Right(a)))
/**
* Create an Attempt failure from an Failure instance, representing the possibility of multiple failures.
*/
def Left[A](errs: FailedAttempt): Attempt[A] =
Attempt(Future.successful(scala.Left(errs)))
/**
* Syntax sugar to create an Attempt failure if there's only a single error.
*/
def Left[A](err: Failure): Attempt[A] =
Attempt(Future.successful(scala.Left(FailedAttempt(err))))
/**
* Asyncronous versions of the Attempt Right/Left helpers for when you have
* a Future that returns a good/bad value directly.
*/
object Async {
/**
* Create an Attempt from a Future of a good value.
*/
def Right[A](fa: Future[A])(implicit ec: ExecutionContext): Attempt[A] =
Attempt(fa.map(scala.Right(_)))
/**
* Create an Attempt from a known failure in the future. For example,
* if a piece of logic fails but you need to make a Database/API call to
* get the failure information.
*/
def Left[A](ferr: Future[FailedAttempt])(implicit ec: ExecutionContext): Attempt[A] =
Attempt(ferr.map(scala.Left(_)))
}
/**
* @see https://stackoverflow.com/questions/16359849/scala-scheduledfuture
**/
object Delayed {
private val timer = new Timer(true)
private def makeTask[A](body: => A)(schedule: TimerTask => Unit)(implicit ctx: ExecutionContext): Future[A] = {
val prom = Promise[A]()
schedule(
new TimerTask {
def run(): Unit = {
ctx.execute(
new Runnable {
def run(): Unit = {
prom.complete(Try(body))
}
}
)
}
}
)
prom.future
}
def apply[A](delay: FiniteDuration)(body: => A)(implicit ctx: ExecutionContext): Attempt[A] = {
Attempt.fromFuture(makeTask(body)(timer.schedule(_, delay.toMillis))) {
case th => Failure(th.getMessage, "Cannot execute the delayed task", 500).attempt
}
}
}
}