app/pekko/gu/agent/Agent.scala (82 lines of code) (raw):

/** This file is copied from the Akka project <https://github.com/akka/akka>. It * has been included here as Agents are now deprecated within Akka. It contains * modifications from the original source. License information can be seen in * the project LICENSE file. * * This original source from which this was copied can be found here * <https://github.com/akka/akka/blob/release-2.4/akka-agent/src/main/scala/akka/agent/Agent.scala> * * Modifications: * - unicode arrow `⇒` is deprecated, use `=>` instead * - Auto-application to `()` is deprecated. Supply the empty argument list * `()` explicitly to invoke method */ /** Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> */ package org.apache.pekko.agent import scala.concurrent.stm._ import scala.concurrent.{ExecutionContext, Future, Promise} import org.apache.pekko.util.{SerializedSuspendableExecutionContext} object Agent { /** Factory method for creating an Agent. */ def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context) /** Java API: Factory method for creating an Agent. */ def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context) /** Default agent implementation. */ private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] { private val ref = Ref(initialValue) private val updater = SerializedSuspendableExecutionContext(10)(context) def get(): T = ref.single.get def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) }) def send(f: T => T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) }) def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit = withinTransaction(new Runnable { def run = try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() }) }) def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) def alter(f: T => T): Future[T] = doAlter(ref.single.transformAndGet(f)) def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] = { val result = Promise[T]() withinTransaction(new Runnable { def run = { updater.suspend() result completeWith Future( try ref.single.transformAndGet(f) finally updater.resume() ) } }) result.future } /** Internal helper method */ private final def withinTransaction(run: Runnable): Unit = { Txn.findCurrent match { case Some(txn) => Txn.afterCommit(_ => updater.execute(run))(txn) case _ => updater.execute(run) } } /** Internal helper method */ private final def doAlter(f: => T): Future[T] = { Txn.findCurrent match { case Some(txn) => val result = Promise[T]() Txn.afterCommit(status => result completeWith Future(f)(updater))(txn) result.future case _ => Future(f)(updater) } } def future(): Future[T] = Future(ref.single.get)(updater) def map[B](f: T => B): Agent[B] = Agent(f(get()))(updater) def flatMap[B](f: T => Agent[B]): Agent[B] = f(get()) def foreach[U](f: T => U): Unit = f(get()) } } /** The Agent class was inspired by agents in Clojure. * * Agents provide asynchronous change of individual locations. Agents are bound * to a single storage location for their lifetime, and only allow mutation of * that location (to a new state) to occur as a result of an action. Update * actions are functions that are asynchronously applied to the Agent's state * and whose return value becomes the Agent's new state. The state of an Agent * should be immutable. * * While updates to Agents are asynchronous, the state of an Agent is always * immediately available for reading by any thread (using ''get'' or ''apply'') * without any messages. * * Agents are reactive. The update actions of all Agents get interleaved * amongst threads in a thread pool. At any point in time, at most one ''send'' * action for each Agent is being executed. Actions dispatched to an agent from * another thread will occur in the order they were sent, potentially * interleaved with actions dispatched to the same agent from other sources. * * Example of usage: * {{{ * val agent = Agent(5) * * agent send (_ * 2) * * ... * * val result = agent() * // use result ... * * }}} * * Agent is also monadic, which means that you can compose operations using * for-comprehensions. In monadic usage the original agents are not touched but * new agents are created. So the old values (agents) are still available * as-is. They are so-called 'persistent'. * * Example of monadic usage: * {{{ * val agent1 = Agent(3) * val agent2 = Agent(5) * * for (value <- agent1) { * result = value + 1 * } * * val agent3 = for (value <- agent1) yield value + 1 * * val agent4 = for { * value1 <- agent1 * value2 <- agent2 * } yield value1 + value2 * * }}} * * ==DEPRECATED STM SUPPORT== * * Agents participating in enclosing STM transaction is a deprecated feature in * 2.3. * * If an Agent is used within an enclosing transaction, then it will * participate in that transaction. Agents are integrated with the STM - any * dispatches made in a transaction are held until that transaction commits, * and are discarded if it is retried or aborted. */ abstract class Agent[T] { /** Java API: Read the internal state of the agent. */ def get(): T /** Read the internal state of the agent. */ def apply(): T = get() /** Dispatch a new value for the internal state. Behaves the same as sending a * function (x => newValue). */ def send(newValue: T): Unit /** Dispatch a function to update the internal state. In Java, pass in an * instance of `akka.dispatch.Mapper`. */ def send(f: T => T): Unit /** Dispatch a function to update the internal state but on its own thread. * This does not use the reactive thread pool and can be used for * long-running or blocking operations. Dispatches using either `sendOff` or * `send` will still be executed in order. In Java, pass in an instance of * `akka.dispatch.Mapper`. */ def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit /** Dispatch an update to the internal state, and return a Future where that * new state can be obtained. In Java, pass in an instance of * `akka.dispatch.Mapper`. */ def alter(newValue: T): Future[T] /** Dispatch a function to update the internal state, and return a Future * where that new state can be obtained. In Java, pass in an instance of * `akka.dispatch.Mapper`. */ def alter(f: T => T): Future[T] /** Dispatch a function to update the internal state but on its own thread, * and return a Future where that new state can be obtained. This does not * use the reactive thread pool and can be used for long-running or blocking * operations. Dispatches using either `alterOff` or `alter` will still be * executed in order. In Java, pass in an instance of `akka.dispatch.Mapper`. */ def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] /** A future to the current value that will be completed after any currently * queued updates. */ def future(): Future[T] /** Map this agent to a new agent, applying the function to the internal * state. Does not change the value of this agent. In Java, pass in an * instance of `akka.dispatch.Mapper`. */ def map[B](f: T => B): Agent[B] /** Flatmap this agent to a new agent, applying the function to the internal * state. Does not change the value of this agent. In Java, pass in an * instance of `akka.dispatch.Mapper`. */ def flatMap[B](f: T => Agent[B]): Agent[B] /** Applies the function to the internal state. Does not change the value of * this agent. In Java, pass in an instance of `akka.dispatch.Foreach`. */ def foreach[U](f: T => U): Unit }