in app/pekko/gu/agent/Agent.scala [28:112]
def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] =
new SecretAgent(initialValue, context)
/** Java API: Factory method for creating an Agent.
*/
def create[T](initialValue: T, context: ExecutionContext): Agent[T] =
Agent(initialValue)(context)
/** Default agent implementation.
*/
private final class SecretAgent[T](initialValue: T, context: ExecutionContext)
extends Agent[T] {
private val ref = Ref(initialValue)
private val updater = SerializedSuspendableExecutionContext(10)(context)
def get(): T = ref.single.get
def send(newValue: T): Unit = withinTransaction(new Runnable {
def run = ref.single.update(newValue)
})
def send(f: T => T): Unit = withinTransaction(new Runnable {
def run = ref.single.transform(f)
})
def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit =
withinTransaction(new Runnable {
def run =
try updater.suspend()
finally
ec.execute(new Runnable {
def run = try ref.single.transform(f)
finally updater.resume()
})
})
def alter(newValue: T): Future[T] = doAlter({
ref.single.update(newValue); newValue
})
def alter(f: T => T): Future[T] = doAlter(ref.single.transformAndGet(f))
def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
withinTransaction(new Runnable {
def run = {
updater.suspend()
result completeWith Future(
try ref.single.transformAndGet(f)
finally updater.resume()
)
}
})
result.future
}
/** Internal helper method
*/
private final def withinTransaction(run: Runnable): Unit = {
Txn.findCurrent match {
case Some(txn) => Txn.afterCommit(_ => updater.execute(run))(txn)
case _ => updater.execute(run)
}
}
/** Internal helper method
*/
private final def doAlter(f: => T): Future[T] = {
Txn.findCurrent match {
case Some(txn) =>
val result = Promise[T]()
Txn.afterCommit(status => result completeWith Future(f)(updater))(txn)
result.future
case _ => Future(f)(updater)
}
}
def future(): Future[T] = Future(ref.single.get)(updater)
def map[B](f: T => B): Agent[B] = Agent(f(get()))(updater)
def flatMap[B](f: T => Agent[B]): Agent[B] = f(get())
def foreach[U](f: T => U): Unit = f(get())
}