riff-raff/app/ci/GreatestSoFar.scala (17 lines of code) (raw):
package ci
import rx.lang.scala.Observable
import scala.util.Random
import scala.concurrent.duration._
object GreatestSoFar {
def apply[T: Ordering](obs: Observable[T]): Observable[T] = {
val ord = implicitly[Ordering[T]]
obs.scan((prev, current) => if (ord.gt(current, prev)) current else prev)
}
}
object AtSomePointIn {
def apply[T](window: Duration)(act: => Observable[T]): Observable[T] = {
val kickOffTime =
Duration.create(Random.nextInt(window.toMillis.toInt) + 1, MILLISECONDS)
Observable.interval(kickOffTime).first.flatMap(_ => act)
}
}