app/services/RabbitMqPropagator.scala (76 lines of code) (raw):
package services
import java.util.UUID
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import com.google.inject.Inject
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.AMQP.Exchange
import javax.inject.Singleton
import org.slf4j.LoggerFactory
import play.api.libs.json.Json.JsValueWrapper
import play.api.libs.json.{JsValue, Json, Writes}
import play.api.{Configuration, Logger}
object RabbitMqPropagator {
private val logger = LoggerFactory.getLogger(getClass)
trait RabbitMqEvent {
}
object ChangeEvent {
def apply(content:Seq[JsValueWrapper], itemType:Option[String], operation: ChangeOperation, uuid:UUID=UUID.randomUUID()): ChangeEvent = {
val renderedJsonContent = Json.stringify(Json.arr(content:_*))
new ChangeEvent(renderedJsonContent, itemType, operation, uuid)
}
}
case class ChangeEvent(json: String, itemType: Option[String], operation: ChangeOperation, uuid:UUID)
extends RabbitMqEvent with JacksonSerializable
}
/**
* Propagates model changes to rabbit mq for others to consume.
*
*/
@Singleton
class RabbitMqPropagator @Inject()(configuration:Configuration, system:ActorSystem) extends Actor {
import RabbitMqPropagator._
import com.newmotion.akka.rabbitmq._
import scala.concurrent.duration._
val logger: Logger = Logger(getClass)
val rmqFactory = new ConnectionFactory()
rmqFactory.setUri(configuration.get[String]("rabbitmq.uri"))
val rmqConnection: ActorRef = system.actorOf(ConnectionActor.props(rmqFactory, reconnectionDelay = 10.seconds), "pluto-core")
val rmqChannel: ActorRef = rmqConnection.createChannel(ChannelActor.props(channelSetup))
val rmqRouteBase = configuration.getOptional[String]("rabbitmq.route-base").getOrElse("core")
val rmqExchange = configuration.getOptional[String]("rabbitmq.exchange").getOrElse("pluto-core")
def channelSetup(channel: Channel, self: ActorRef): Exchange.DeclareOk = {
channel.exchangeDeclare(rmqExchange, "topic")
}
/**
* builds properties in-line with what storagetier etc. will expect
* @return
*/
private def buildProps() = {
val builder = new AMQP.BasicProperties.Builder()
builder.messageId(UUID.randomUUID().toString) //create a new randomised message ID
.contentType("application/json")
.contentEncoding("UTF-8")
.build()
}
override def receive: Receive = {
/**
* re-run any messages stuck in the actor's state. This is sent at 5 minute intervals by ClockSingleton and
* is there to ensure that events get retried (e.g. one instance loses network connectivity before postgres update is sent,
* it is restarted, so another instance will pick up the update)
*/
case ev:ChangeEvent =>
ev.itemType match {
case Some(itemtype) =>
val route = s"$rmqRouteBase.$itemtype.${operationPath(ev.operation)}"
rmqChannel ! ChannelMessage(channel => channel.basicPublish(rmqExchange, route, buildProps(), ev.json.getBytes), dropIfNoChannel = false)
sender() ! akka.actor.Status.Success(())
case None =>
logger.error("Unknown object type for rabbitmq propagation")
sender() ! akka.actor.Status.Failure(new IllegalArgumentException("Unknown object type for rabbitmq propagation"))
}
case other:Any=>
logger.error(s"RabbitMQPropagator got an unexpected message: ${other}")
}
def operationPath(operation: ChangeOperation): String = operation match {
case CreateOperation() => "create"
case UpdateOperation() => "update"
}
}
// Annotations to tell Jackson Databind how to handle the CreateOperation type
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property="change-operation")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[CreateOperation], name="create"),
new JsonSubTypes.Type(value = classOf[UpdateOperation], name="update"),
)
)
sealed trait ChangeOperation extends JacksonSerializable
case class CreateOperation() extends ChangeOperation
case class UpdateOperation() extends ChangeOperation