app/services/AtomResponderReceiver.scala (143 lines of code) (raw):

package services import akka.actor.{ActorRef, ActorSystem} import com.newmotion.akka.rabbitmq._ import com.rabbitmq.client.AMQP.Exchange import com.rabbitmq.client.{AMQP, ShutdownSignalException} import models.{PlutoCommission, PlutoCommissionRow} import org.slf4j.LoggerFactory import play.api.Configuration import javax.inject.{Inject, Singleton} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import org.apache.commons.codec.binary.StringUtils import play.api.db.slick.DatabaseConfigProvider import slick.jdbc.PostgresProfile import scala.concurrent.duration._ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import slick.jdbc.PostgresProfile.api._ import slick.lifted.TableQuery case class MissingCommission(id: Int) extends Object @Singleton class AtomResponderReceiver @Inject()(config:Configuration, dbConfigProvider:DatabaseConfigProvider)(implicit system:ActorSystem) { import io.circe.generic.auto._ private val logger = LoggerFactory.getLogger(getClass) private val factory = new ConnectionFactory() logger.debug("Got Rabbit MQ connection factory") factory.setUri(config.get[String]("rabbitmq.uri")) val exchangeName = config.getOptional[String]("rabbitmq.atomresponder_exchange").getOrElse("pluto-atomresponder") private val connection = system.actorOf(ConnectionActor.props(factory), "atomresponder-receiver-connection") logger.debug("Got Rabbit MQ connection actor, requesting subscriber") connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("plutocore-atomresponder-channel")) private implicit val db = dbConfigProvider.get[PostgresProfile].db val rmqExchange = config.getOptional[String]("rabbitmq.exchange").getOrElse("pluto-core") def channelSetup(channel: Channel, self: ActorRef): Exchange.DeclareOk = { channel.exchangeDeclare(rmqExchange, "topic") } val rmqFactory = new ConnectionFactory() rmqFactory.setUri(config.get[String]("rabbitmq.uri")) val rmqConnection: ActorRef = system.actorOf(ConnectionActor.props(rmqFactory, reconnectionDelay = 10.seconds), "pluto-core-two") val rmqChannel: ActorRef = rmqConnection.createChannel(ChannelActor.props(channelSetup)) val rmqRouteBase = config.getOptional[String]("rabbitmq.route-base").getOrElse("core") def loadEvent(body:Array[Byte]) = { for { bodyAsString <- Try { StringUtils.newStringUtf8(body)}.toEither parsedContent <- io.circe.parser.parse(bodyAsString) marshalledContent <- parsedContent.as[MissingCommission] } yield marshalledContent } def getCommission(commissionId: Option[Int]):Future[Option[PlutoCommission]] = { commissionId match { case None=>Future(None) case Some(commId)=> db.run( TableQuery[PlutoCommissionRow].filter(_.id===commId).result.asTry ).map({ case Success(matchingEntries)=>matchingEntries.headOption case Failure(error)=>throw error }) } } def publishData(messageType: String, plutoCom: PlutoCommission, commissionId: Int) = { val route = s"$rmqRouteBase.commission.${messageType}" var messageToSend: String = s"""[{"id":$commissionId}]""" if (plutoCom != null) { messageToSend = s"""[{"id":${plutoCom.id.get},"created":"${plutoCom.created.toString}","updated":"${plutoCom.updated.toString}","title":"${plutoCom.title}","status":"${plutoCom.status}","description":"${plutoCom.description}","workingGroupId":${plutoCom.workingGroup},"scheduledCompletion":"${plutoCom.scheduledCompletion.toString}","owner":"${plutoCom.owner}","productionOffice":"${plutoCom.productionOffice}"}]""" } rmqChannel ! ChannelMessage(channel => channel.basicPublish(rmqExchange, route, null, messageToSend.getBytes), dropIfNoChannel = false) } def makeConsumer(channel:Channel): DefaultConsumer = { new DefaultConsumer(channel) { override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = super.handleShutdownSignal(consumerTag, sig) override def handleCancel(consumerTag: String): Unit = super.handleCancel(consumerTag) override def handleCancelOk(consumerTag: String): Unit = super.handleCancelOk(consumerTag) override def handleConsumeOk(consumerTag: String): Unit = super.handleConsumeOk(consumerTag) override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = { loadEvent(body) match { case Left(err)=> logger.error(s"Received invalid message with key ${envelope.getRoutingKey} from exchange ${envelope.getExchange}: ${err.getMessage}", err) case Right(messageData)=> handleEvent(messageData).map({ case true=> channel.basicAck(envelope.getDeliveryTag, false) case false=> logger.warn("Could not handle message, leaving it on-queue") channel.basicNack(envelope.getDeliveryTag, false, true) }) } } def handleEvent(messageData: MissingCommission): Future[Boolean] = { if (messageData.id.isValidInt) { logger.debug(s"Missing commission id. is ${messageData.id}") val commissionDataObject = getCommission(Some(messageData.id)) commissionDataObject.onComplete({ case Success(commissionData) => { commissionData match { case Some(PlutoCommission(id, collectionId, siteId, created, updated, title, status, description, workingGroup, originalCommissionerName, scheduledCompletion, owner, notes, productionOffice, originalTitle, googleFolder, confidential)) => { logger.debug(s"Found a commission.") publishData("update", commissionData.get, messageData.id) } case None => { logger.debug(s"No commission found.") publishData("delete", null, messageData.id) } } } case Failure(exception) => { logger.info(s"${exception}") } }) Future(true) } else Future(false) } override def handleRecoverOk(consumerTag: String): Unit = super.handleRecoverOk(consumerTag) } } def setupSubscriber(channel:Channel, self:ActorRef) = { logger.debug(s"Setting up Rabbit MQ subscriber") logger.debug(s"Exchange name is $exchangeName") val maybeQueue = Try { channel .queueDeclare("missing-commissions", false, false, false, null) .getQueue } maybeQueue match { case Success(queue) => logger.debug("Binding to exchange....") channel.queueBind(queue, exchangeName, "atomresponder.commission.#") logger.debug("Initiating consumer...") channel.basicConsume(queue, makeConsumer(channel)) logger.debug("Setup done") case Failure(err) => logger.error(s"Could not declare queue, terminating: ${err.getMessage}", err) system .terminate() .andThen({ case _=> Thread.sleep(2000) logger.error(s"pluto-core terminated because the setup of atomresponder-receiver queue in Rabbit MQ was incorrect. Try deleting it and allowing pluto-core to startup again.") logger.error(s"Actual exception was: ${err.getMessage}", err) }) } } }