app/lib/Dogpile.scala (25 lines of code) (raw):
package lib
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
/**
* Requirements:
*
* 0. Only ONE scan must happen at any given time
* 1. When a scan is requested, a scan MUST take place at some point in the future.
* 2. A scan that is currently running does not count, a fresh scan must be done AFTERWARDS
* 3. Scan requests that can not be run immediately should be aggregated into a single request
*
* @param thing
* @tparam R the result type generated by the scan
*/
class Dogpile[R](thing: => Future[R]) {
/*
currently executing scan?
upcoming scan?
States:
* No scan running
* Scan running
* Scan running, another scan requested
OR
* Can run scan immediately
* Must wait (future already generated or not?)
Should stateRef store futures? We never want the result of the *currently running* future - but we DO
want a reference to the future of the *upcoming* scan, so that we can share it among requesters.
*/
sealed trait State {
val scanFuture: Future[R]
}
case class ScanRun(scanFuture: Future[R]) extends State
case class ScanQueued(scanFuture: Future[R]) extends State
private val stateRef: AtomicReference[State] = new AtomicReference(ScanRun(Future.failed(new IllegalStateException())))
/**
*
* @return a future for a run which has been initiated at or after this call
*/
def doAtLeastOneMore(): Future[R] = stateRef.updateAndGet { // TODO updateAndGet shouldn't handle side-effects
previousState =>
if (previousState.scanFuture.isCompleted) ScanRun(thing) else {
previousState match {
case ScanQueued(_) => previousState
case ScanRun(s) => {
val p = Promise[R]()
s.onComplete(_ => p.completeWith(thing))
ScanQueued(p.future)
}
}
}
}.scanFuture
}