app/services/RabbitMqMatrix.scala (74 lines of code) (raw):

package services import akka.actor.{Actor, ActorRef, ActorSystem} import com.google.inject.Inject import com.rabbitmq.client.AMQP.{Exchange, BasicProperties} import mes.OnlineOutputMessage import javax.inject.Singleton import org.slf4j.LoggerFactory import play.api.{Configuration, Logger} import java.util.UUID object RabbitMqMatrix { private val logger = LoggerFactory.getLogger(getClass) trait RabbitMqMatrixEvent { } object MatrixEvent { def apply(message: OnlineOutputMessage): MatrixEvent = { new MatrixEvent(message) } } case class MatrixEvent(message: OnlineOutputMessage) extends RabbitMqMatrixEvent } @Singleton class RabbitMqMatrix @Inject()(configuration:Configuration, system:ActorSystem) extends Actor { import RabbitMqMatrix._ import com.newmotion.akka.rabbitmq._ import scala.concurrent.duration._ val logger: Logger = Logger(getClass) val rmqFactory = new ConnectionFactory() rmqFactory.setUri(configuration.get[String]("rabbitmq.uri")) val rmqConnection: ActorRef = system.actorOf(ConnectionActor.props(rmqFactory, reconnectionDelay = 10.seconds), "pluto-core-matrix") val rmqChannel: ActorRef = rmqConnection.createChannel(ChannelActor.props(channelSetup)) val rmqRouteBase = configuration.getOptional[String]("rabbitmq.matrix.route-base").getOrElse("storagetier.restorer.media_not_required.nearline") val rmqExchange = configuration.getOptional[String]("rabbitmq.matrix.exchange").getOrElse("storagetier-project-restorer") def channelSetup(channel: Channel, self: ActorRef): Exchange.DeclareOk = { channel.exchangeDeclare(rmqExchange, "topic", true) } override def receive: Receive = { case event:MatrixEvent => logger.info(s"RabbitMqMatrix is attempting to send a message to the queue.") var originalPath = "" if (event.message.originalFilePath.isDefined) { originalPath = event.message.originalFilePath.get } var fileSize: Long = 0 if (event.message.fileSize.isDefined) { fileSize = event.message.fileSize.get } var nearlineId = "" if (event.message.nearlineId.isDefined) { nearlineId = event.message.nearlineId.get } var projectIdsString = s"[" event.message.projectIds.foreach(project => { projectIdsString = s"""$projectIdsString"$project",""" }) var vidispineItemId = "" if (event.message.vidispineItemId.isDefined) { vidispineItemId = event.message.vidispineItemId.get } projectIdsString = projectIdsString.dropRight(1) projectIdsString = s"$projectIdsString]" val messageToSend: String = s"""{"mediaTier":"${event.message.mediaTier}","projectIds":${projectIdsString},"originalFilePath":"$originalPath","fileSize":$fileSize,"vidispineItemId":"$vidispineItemId","nearlineId":"${nearlineId}","mediaCategory":"${event.message.mediaCategory}","forceDelete":true}""" val msgProps = new BasicProperties.Builder() .contentType("application/json") .contentEncoding("UTF-8") .messageId(UUID.randomUUID().toString) .build() rmqChannel ! ChannelMessage(channel => channel.basicPublish(rmqExchange, rmqRouteBase, msgProps, messageToSend.getBytes), dropIfNoChannel = false) case other:Any=> logger.error(s"RabbitMqMatrix got an unexpected input: ${other}") case _=> logger.error(s"RabbitMqMatrix got an unexpected input.") } }