app/services/SQSQueue.scala (55 lines of code) (raw):

package services import java.util.concurrent.atomic.AtomicBoolean import com.amazonaws.services.sqs.model._ import play.api.Logging import scala.jdk.CollectionConverters._ import scala.annotation.tailrec import scala.util.control.NonFatal class SQSQueue(val queueName: String) { lazy val queueUrl = { val queueNameLookupResponse = SQS.SQSClient.getQueueUrl(new GetQueueUrlRequest(queueName)) queueNameLookupResponse.getQueueUrl } def pollMessages(messageCount: Int, waitTimeSeconds: Int) = { val response = SQS.SQSClient.receiveMessage( new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(waitTimeSeconds).withMaxNumberOfMessages(messageCount) ) response.getMessages.asScala.toList } def deleteMessage(message: Message): Unit = { SQS.SQSClient.deleteMessage( new DeleteMessageRequest(queueUrl, message.getReceiptHandle) ) } def postMessage(message: String, delaySeconds: Int = 0): Unit = { SQS.SQSClient.sendMessage( new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(message) .withDelaySeconds(delaySeconds) ) } } trait SQSQueueConsumer extends Logging { def queue: SQSQueue def processMessage(message: Message): Unit val running = new AtomicBoolean(true) def stop = running.set(false) @tailrec final def run(): Unit = { if(running.get()) { try { for(message <- queue.pollMessages(1, 5)) { // only grab one message, do the rest of the cluster gets a chance logger.debug(s"processing message form queue ${queue.queueName}") processMessage(message) queue.deleteMessage(message) } } catch { case NonFatal(e) => { logger.error(s"error processing messages from job queue ${queue.queueName}", e) } } run() } } }