in app/services/GenericSqsActor.scala [64:111]
def handleGeneric(msg: SQSMsg):Unit = msg match {
//dispatched to pull all messages off the queue. This "recurses" by dispatching itself if there are messages left on the queue.
case HandleNextSqsMessage(rq:ReceiveMessageRequest)=>
val result = sqsClient.receiveMessage(rq)
msgList = msgList ++ result.getMessages.asScala.map(m=>(m, rq.getQueueUrl))
if(isReady && msgList.nonEmpty) ownRef ! ReadyForNextMessage
if(msgList.isEmpty) {
sender() ! Status.Success
}
case ReadyForNextMessage=> //sent by the subclass to indicate that it is ready for more content
msgList.headOption match {
case Some((msg, queueUrl)) =>
isReady = false
msgList = msgList.tail
logger.debug(s"Received message ${msg.getMessageId}:")
logger.debug(s"\tAttributes: ${msg.getAttributes.asScala}")
logger.debug(s"\tReceipt Handle: ${msg.getReceiptHandle}")
logger.debug(s"\tBody: ${msg.getBody}")
convertMessageBody(msg.getBody) match {
case Left(err) =>
logger.error(s"Could not decode message from queue: $err")
logger.error(s"Message was ${msg.getBody}")
sender() ! Status.Failure
case Right(finalMsg) =>
ownRef ! HandleDomainMessage(finalMsg, queueUrl, msg.getReceiptHandle)
}
case None =>
logger.info(s"No more messages to consume")
isReady = true
ownRef ! CheckForNotifications
}
case CheckForNotifications=>
logger.debug("CheckForNotifications")
val rq = new ReceiveMessageRequest().withQueueUrl(notificationsQueue)
.withWaitTimeSeconds(10)
.withMaxNumberOfMessages(10)
if(notificationsQueue=="queueUrl"){
logger.warn("notifications queue not set up in applications.conf")
sender() ! Status.Failure
} else {
ownRef ! HandleNextSqsMessage(rq)
}
}