in app/services/FileMoveQueue.scala [106:207]
def concurrency:Int = config.getOptional[Int]("filemover.concurrency").getOrElse(5)
/**
* simple boolean indicator of whether the queue is empty
* @return
*/
def isIdle:Boolean = countInProgress==0
override def receive: Receive = {
case InternalMarkDone=>
if(countInProgress>0) countInProgress-=1
logger.info(s"Currently $countInProgress copy jobs in progress")
sender() ! Success( () )
case InternalMarkBusy=>
countInProgress+=1
logger.info(s"Currently $countInProgress copy jobs in progress")
sender() ! Success( () )
//used for testing/debugging to show 'current' idle state. Note that this may be stale by the time it reaches the sender!
case GetIdleState=>
sender() ! CurrentIdleState(isIdle)
//this is called from a timer to poll for more notifications.
//we only want to check for new notifications if we have no operations currently in progress
case CheckForNotificationsIfIdle=>
if(isIdle) {
ownRef ! CheckForNotifications
}
//a user has requested a file move
case EnqueueMove(fileId, toCollection, uid)=>
logger.info(s"Received request from $uid to move $fileId to $toCollection")
val msgBody = FileMoveMessage(fileId, toCollection).asJson.noSpaces
Try { sqsClient.sendMessage(notificationsQueue, msgBody) } match {
case scala.util.Success(_)=>
logger.info(s"Successfully enqueued request for $fileId")
sender() ! EnqueuedOk(fileId)
case scala.util.Failure(err)=>
logger.error(s"Could not enqueue request for $fileId: ${err.getMessage}", err)
sender() ! EnqueuedProblem(fileId, err.getMessage)
}
//a file move process was completed ok
case FileMoveActor.MoveSuccess(fileId, remoteMessageId)=>
logger.info(s"Move of $fileId worked fine, removing message from queue")
remoteMessageId match {
case Some(receiptHandle) =>
try {
sqsClient.deleteMessage(notificationsQueue, receiptHandle)
} catch {
case err: Throwable =>
logger.error(s"Could not delete message for ${fileId} with receipt handle $receiptHandle: ${err.getMessage}", err)
}
case None =>
logger.warn("No remote message ID present so can't delete message from SQS")
}
ownRef ! InternalMarkDone
ownRef ! ReadyForNextMessage
//a file move process failed
case FileMoveActor.MoveFailed(fileId, error, remoteMessageId)=>
if(error=="Source file did not exist" || error.contains("has been deleted in the storage")) { //noqa: yeah i don't like it much either but the alternative is very long-winded
logger.error(s"Could not move file $fileId: $error. Message will be removed from the queue")
remoteMessageId match {
case Some(receiptHandle) =>
Try { sqsClient.deleteMessage(notificationsQueue, receiptHandle) }
case None=>
logger.warn("No remove message ID so can't delete message from SQS")
}
} else {
logger.error(s"Could not move file $fileId: $error. Message will be left on queue to retry after a (long) delay")
}
ownRef ! InternalMarkDone
ownRef ! ReadyForNextMessage
//a request for a move came off the queue
case HandleDomainMessage(msg:FileMoveMessage, queueUrl, receiptHandle)=>
logger.info(s"Received copy request for ${msg.fileId} to ${msg.toCollection}")
scanTargetDAO.withScanTarget(msg.toCollection) { scanTarget=>
if(scanTarget.enabled) {
fileMoveActor ! MoveFile(msg.fileId, scanTarget, Some(receiptHandle), ownRef)
ownRef ! InternalMarkBusy //increment the busy counter
if(countInProgress+1<concurrency) ownRef ! ReadyForNextMessage //we can process up to this number of messages
} else {
logger.warn(s"Cannot move ${msg.fileId} to ${scanTarget.bucketName} because the scan target is disabled")
try {
sqsClient.deleteMessage(notificationsQueue, receiptHandle)
} catch {
case err: Throwable =>
logger.error(s"Could not delete message for ${msg.fileId} with receipt handle $receiptHandle: ${err.getMessage}", err)
}
ownRef ! ReadyForNextMessage
}
}
//other messages are handled by the superclass
case other:SQSMsg => handleGeneric(other)
}