def makeConsumer()

in app/services/PeriodicScanReceiver.scala [48:106]


  def makeConsumer(channel:Channel) = {
    new DefaultConsumer(channel) {
      //note - docs say to avoid long-running code here because it delays dispatch of other messages on the same connection
      override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = super.handleShutdownSignal(consumerTag, sig)

      override def handleCancel(consumerTag: String): Unit = super.handleCancel(consumerTag)

      override def handleCancelOk(consumerTag: String): Unit = super.handleCancelOk(consumerTag)

      override def handleConsumeOk(consumerTag: String): Unit = super.handleConsumeOk(consumerTag)

      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
        loadScanEvent(body) match {
          case Left(err)=>
            logger.error(s"Received invalid scan request with key ${envelope.getRoutingKey} from exchange ${envelope.getExchange}: ${err.getMessage}", err)

          case Right(scanEvent)=>
            handleScanEvent(consumerTag, envelope.getRoutingKey, scanEvent).map({
              case true=>
                channel.basicAck(envelope.getDeliveryTag, false)
              case false=>
                logger.warn("Could not handle message, leaving it on-queue")
                channel.basicNack(envelope.getDeliveryTag, false, true)
            })
        }
      }

      def handleScanEvent(consumerTag: String, routingKey: String, scanEvent: ScanEvent): Future[Boolean] = {
        scanEvent.action match {
          case ServiceEventAction.CancelAction=>
            logger.warn(s"Received cancel for '$routingKey' with '$consumerTag' but this is not implemented yet")
            Future(false)
          case ServiceEventAction.PerformAction=>
            routingKey match {
              case "pluto.core.service.storagescan"=>
                logger.debug("Triggering storage check in response to incoming message")
                storageScanner ! StorageScanner.Rescan
                Future(true)
              case "pluto.core.service.commissionstatuspropagator"=>
                logger.debug("Triggering commission status propagator check for retries in response to incoming message")
                commissionStatusPropagator ! CommissionStatusPropagator.RetryFromState(UUID.randomUUID())
                Future(true)
              case "pluto.core.service.postrunaction"=>
                logger.debug("Triggering postrun action check in response to incoming message")
                postrunActionScanner ! PostrunActionScanner.Rescan
                Future(true)
              case "pluto.core.service.backuptrigger"=>
                logger.info("Timed backup is now run from an external job")
                Future(true)
              case _=>
                logger.warn(s"PeriodicScanReceiver got an unknown message: $routingKey")
                Future(true)
            }
        }
      }

      override def handleRecoverOk(consumerTag: String): Unit = super.handleRecoverOk(consumerTag)
    }
  }