app/services/FileMoveQueue.scala (129 lines of code) (raw):

package services import akka.actor.Status.Success import akka.actor.{ActorRef, ActorSystem} import akka.stream.Materializer import com.amazonaws.services.sqs.AmazonSQS import com.amazonaws.services.sqs.model.{DeleteMessageRequest, SendMessageRequest} import com.theguardian.multimedia.archivehunter.common.clientManagers.SQSClientManager import com.theguardian.multimedia.archivehunter.common.cmn_models.ScanTargetDAO import io.circe import io.circe.generic.auto._ import io.circe.syntax._ import org.slf4j.LoggerFactory import play.api.Configuration import services.FileMoveActor.MoveFile import javax.inject.{Inject, Named, Singleton} import scala.concurrent.ExecutionContext import scala.util.Try case class FileMoveMessage(fileId:String, toCollection:String) object FileMoveQueue { sealed trait FileMoveMsg sealed trait FileMoveResponse //responses final case class EnqueuedOk(fileId:String) extends FileMoveResponse final case class EnqueuedProblem(fileId:String,problem:String) extends FileMoveResponse /** * used for debugging, in response to GetIdleState. Returns the current idle state of the actor. * @param idle if the actor is idle or not */ final case class CurrentIdleState(idle:Boolean) extends FileMoveResponse //requests /** * Sent from a Controller to push a message onto the external SQS queue. Returned EnqueuedOk once message has been enqueued * or EnqueuedProblem if it could not be * @param fileId file ID to move * @param toCollection collection name to move it to * @param uid user ID of the requesting user */ final case class EnqueueMove(fileId:String, toCollection:String, uid:String) extends FileMoveMsg /** * Dispatches "CheckForNotifications" only if the processor is in an idle state, otherwise ignored */ final case object CheckForNotificationsIfIdle extends FileMoveMsg /** * Updates the "idle" flag to the given state */ final case object InternalMarkDone extends FileMoveMsg final case object InternalMarkBusy extends FileMoveMsg /** * Request the current value of the 'Idle' flag. Used for testing and debugging. * Passes back a message of form CurrentIdleState. */ final case object GetIdleState extends FileMoveMsg } /** * The FileMoveQueue actor sits atop the FileMoveActor, and passes requests for file-moves out to an SQS queue. * It also receives requests in from the queue, based on a timed poll message from ClockSingleton. It will retrieve messages * and request moves for them, provided that there is no move currently in progress. * The actor handles its own mutable state via InternalSetIdleState() * @param config app configuration * @param sqsClientMgr SQSClientManager instance for getting hold of an SQS client * @param actorSystem akka actor system * @param materializer akka materializer * @param scanTargetDAO ScanTargetDAO instance, needed for validating the destination collection name * @param fileMoveActor reference to the FileMoveActor */ @Singleton class FileMoveQueue @Inject()(config:Configuration, sqsClientMgr: SQSClientManager, actorSystem:ActorSystem, materializer:Materializer, scanTargetDAO: ScanTargetDAO, @Named("fileMoveActor") fileMoveActor:ActorRef) extends GenericSqsActor[FileMoveMessage] { import GenericSqsActor._ import FileMoveQueue._ private val logger = LoggerFactory.getLogger(getClass) private var countInProgress = 0 override protected val sqsClient: AmazonSQS = sqsClientMgr.getClient(config.getOptional[String]("externalData.awsProfile")) override protected implicit val implSystem: ActorSystem = actorSystem override protected implicit val mat: Materializer = materializer override protected val notificationsQueue: String = config.get[String]("filemover.notificationsQueue") override protected val ownRef: ActorRef = self override protected implicit val ec: ExecutionContext = actorSystem.dispatcher override def convertMessageBody(body: String): Either[circe.Error, FileMoveMessage] = { io.circe.parser.parse(body).flatMap(_.as[FileMoveMessage]) } /** * `concurrency` gives a limit of the number of copies that can take place at one time * @return the configured concurrency or default value of 5 */ def concurrency:Int = config.getOptional[Int]("filemover.concurrency").getOrElse(5) /** * simple boolean indicator of whether the queue is empty * @return */ def isIdle:Boolean = countInProgress==0 override def receive: Receive = { case InternalMarkDone=> if(countInProgress>0) countInProgress-=1 logger.info(s"Currently $countInProgress copy jobs in progress") sender() ! Success( () ) case InternalMarkBusy=> countInProgress+=1 logger.info(s"Currently $countInProgress copy jobs in progress") sender() ! Success( () ) //used for testing/debugging to show 'current' idle state. Note that this may be stale by the time it reaches the sender! case GetIdleState=> sender() ! CurrentIdleState(isIdle) //this is called from a timer to poll for more notifications. //we only want to check for new notifications if we have no operations currently in progress case CheckForNotificationsIfIdle=> if(isIdle) { ownRef ! CheckForNotifications } //a user has requested a file move case EnqueueMove(fileId, toCollection, uid)=> logger.info(s"Received request from $uid to move $fileId to $toCollection") val msgBody = FileMoveMessage(fileId, toCollection).asJson.noSpaces Try { sqsClient.sendMessage(notificationsQueue, msgBody) } match { case scala.util.Success(_)=> logger.info(s"Successfully enqueued request for $fileId") sender() ! EnqueuedOk(fileId) case scala.util.Failure(err)=> logger.error(s"Could not enqueue request for $fileId: ${err.getMessage}", err) sender() ! EnqueuedProblem(fileId, err.getMessage) } //a file move process was completed ok case FileMoveActor.MoveSuccess(fileId, remoteMessageId)=> logger.info(s"Move of $fileId worked fine, removing message from queue") remoteMessageId match { case Some(receiptHandle) => try { sqsClient.deleteMessage(notificationsQueue, receiptHandle) } catch { case err: Throwable => logger.error(s"Could not delete message for ${fileId} with receipt handle $receiptHandle: ${err.getMessage}", err) } case None => logger.warn("No remote message ID present so can't delete message from SQS") } ownRef ! InternalMarkDone ownRef ! ReadyForNextMessage //a file move process failed case FileMoveActor.MoveFailed(fileId, error, remoteMessageId)=> if(error=="Source file did not exist" || error.contains("has been deleted in the storage")) { //noqa: yeah i don't like it much either but the alternative is very long-winded logger.error(s"Could not move file $fileId: $error. Message will be removed from the queue") remoteMessageId match { case Some(receiptHandle) => Try { sqsClient.deleteMessage(notificationsQueue, receiptHandle) } case None=> logger.warn("No remove message ID so can't delete message from SQS") } } else { logger.error(s"Could not move file $fileId: $error. Message will be left on queue to retry after a (long) delay") } ownRef ! InternalMarkDone ownRef ! ReadyForNextMessage //a request for a move came off the queue case HandleDomainMessage(msg:FileMoveMessage, queueUrl, receiptHandle)=> logger.info(s"Received copy request for ${msg.fileId} to ${msg.toCollection}") scanTargetDAO.withScanTarget(msg.toCollection) { scanTarget=> if(scanTarget.enabled) { fileMoveActor ! MoveFile(msg.fileId, scanTarget, Some(receiptHandle), ownRef) ownRef ! InternalMarkBusy //increment the busy counter if(countInProgress+1<concurrency) ownRef ! ReadyForNextMessage //we can process up to this number of messages } else { logger.warn(s"Cannot move ${msg.fileId} to ${scanTarget.bucketName} because the scan target is disabled") try { sqsClient.deleteMessage(notificationsQueue, receiptHandle) } catch { case err: Throwable => logger.error(s"Could not delete message for ${msg.fileId} with receipt handle $receiptHandle: ${err.getMessage}", err) } ownRef ! ReadyForNextMessage } } //other messages are handled by the superclass case other:SQSMsg => handleGeneric(other) } }