def concurrency:Int = config.getOptional[Int]()

in app/services/FileMoveQueue.scala [106:207]


  def concurrency:Int = config.getOptional[Int]("filemover.concurrency").getOrElse(5)

  /**
    * simple boolean indicator of whether the queue is empty
    * @return
    */
  def isIdle:Boolean = countInProgress==0

  override def receive: Receive = {
    case InternalMarkDone=>
      if(countInProgress>0) countInProgress-=1
      logger.info(s"Currently $countInProgress copy jobs in progress")
      sender() ! Success( () )

    case InternalMarkBusy=>
      countInProgress+=1
      logger.info(s"Currently $countInProgress copy jobs in progress")
      sender() ! Success( () )

    //used for testing/debugging to show 'current' idle state. Note that this may be stale by the time it reaches the sender!
    case GetIdleState=>
      sender() ! CurrentIdleState(isIdle)

    //this is called from a timer to poll for more notifications.
    //we only want to check for new notifications if we have no operations currently in progress
    case CheckForNotificationsIfIdle=>
      if(isIdle) {
        ownRef ! CheckForNotifications
      }

    //a user has requested a file move
    case EnqueueMove(fileId, toCollection, uid)=>
      logger.info(s"Received request from $uid to move $fileId to $toCollection")
      val msgBody = FileMoveMessage(fileId, toCollection).asJson.noSpaces
      Try { sqsClient.sendMessage(notificationsQueue, msgBody) } match {
        case scala.util.Success(_)=>
          logger.info(s"Successfully enqueued request for $fileId")
          sender() ! EnqueuedOk(fileId)
        case scala.util.Failure(err)=>
          logger.error(s"Could not enqueue request for $fileId: ${err.getMessage}", err)
          sender() ! EnqueuedProblem(fileId, err.getMessage)
      }

    //a file move process was completed ok
    case FileMoveActor.MoveSuccess(fileId, remoteMessageId)=>
      logger.info(s"Move of $fileId worked fine, removing message from queue")

      remoteMessageId match {
        case Some(receiptHandle) =>
          try {
            sqsClient.deleteMessage(notificationsQueue, receiptHandle)
          } catch {
            case err: Throwable =>
              logger.error(s"Could not delete message for ${fileId} with receipt handle $receiptHandle: ${err.getMessage}", err)
          }
        case None =>
          logger.warn("No remote message ID present so can't delete message from SQS")
      }
      ownRef ! InternalMarkDone
      ownRef ! ReadyForNextMessage

    //a file move process failed
    case FileMoveActor.MoveFailed(fileId, error, remoteMessageId)=>
      if(error=="Source file did not exist" || error.contains("has been deleted in the storage")) {  //noqa: yeah i don't like it much either but the alternative is very long-winded
        logger.error(s"Could not move file $fileId: $error. Message will be removed from the queue")
        remoteMessageId match {
          case Some(receiptHandle) =>
            Try { sqsClient.deleteMessage(notificationsQueue, receiptHandle) }
          case None=>
            logger.warn("No remove message ID so can't delete message from SQS")
        }
      } else {
        logger.error(s"Could not move file $fileId: $error. Message will be left on queue to retry after a (long) delay")
      }
      ownRef ! InternalMarkDone
      ownRef ! ReadyForNextMessage

    //a request for a move came off the queue
    case HandleDomainMessage(msg:FileMoveMessage, queueUrl, receiptHandle)=>
      logger.info(s"Received copy request for ${msg.fileId} to ${msg.toCollection}")

      scanTargetDAO.withScanTarget(msg.toCollection) { scanTarget=>
        if(scanTarget.enabled) {
          fileMoveActor ! MoveFile(msg.fileId, scanTarget, Some(receiptHandle), ownRef)
          ownRef ! InternalMarkBusy //increment the busy counter
          if(countInProgress+1<concurrency) ownRef ! ReadyForNextMessage  //we can process up to this number of messages
        } else {
          logger.warn(s"Cannot move ${msg.fileId} to ${scanTarget.bucketName} because the scan target is disabled")
          try {
            sqsClient.deleteMessage(notificationsQueue, receiptHandle)
          } catch {
            case err: Throwable =>
              logger.error(s"Could not delete message for ${msg.fileId} with receipt handle $receiptHandle: ${err.getMessage}", err)
          }

          ownRef ! ReadyForNextMessage
        }
      }

    //other messages are handled by the superclass
    case other:SQSMsg => handleGeneric(other)
  }