in app/services/PeriodicScanReceiver.scala [48:106]
def makeConsumer(channel:Channel) = {
new DefaultConsumer(channel) {
//note - docs say to avoid long-running code here because it delays dispatch of other messages on the same connection
override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = super.handleShutdownSignal(consumerTag, sig)
override def handleCancel(consumerTag: String): Unit = super.handleCancel(consumerTag)
override def handleCancelOk(consumerTag: String): Unit = super.handleCancelOk(consumerTag)
override def handleConsumeOk(consumerTag: String): Unit = super.handleConsumeOk(consumerTag)
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
loadScanEvent(body) match {
case Left(err)=>
logger.error(s"Received invalid scan request with key ${envelope.getRoutingKey} from exchange ${envelope.getExchange}: ${err.getMessage}", err)
case Right(scanEvent)=>
handleScanEvent(consumerTag, envelope.getRoutingKey, scanEvent).map({
case true=>
channel.basicAck(envelope.getDeliveryTag, false)
case false=>
logger.warn("Could not handle message, leaving it on-queue")
channel.basicNack(envelope.getDeliveryTag, false, true)
})
}
}
def handleScanEvent(consumerTag: String, routingKey: String, scanEvent: ScanEvent): Future[Boolean] = {
scanEvent.action match {
case ServiceEventAction.CancelAction=>
logger.warn(s"Received cancel for '$routingKey' with '$consumerTag' but this is not implemented yet")
Future(false)
case ServiceEventAction.PerformAction=>
routingKey match {
case "pluto.core.service.storagescan"=>
logger.debug("Triggering storage check in response to incoming message")
storageScanner ! StorageScanner.Rescan
Future(true)
case "pluto.core.service.commissionstatuspropagator"=>
logger.debug("Triggering commission status propagator check for retries in response to incoming message")
commissionStatusPropagator ! CommissionStatusPropagator.RetryFromState(UUID.randomUUID())
Future(true)
case "pluto.core.service.postrunaction"=>
logger.debug("Triggering postrun action check in response to incoming message")
postrunActionScanner ! PostrunActionScanner.Rescan
Future(true)
case "pluto.core.service.backuptrigger"=>
logger.info("Timed backup is now run from an external job")
Future(true)
case _=>
logger.warn(s"PeriodicScanReceiver got an unknown message: $routingKey")
Future(true)
}
}
}
override def handleRecoverOk(consumerTag: String): Unit = super.handleRecoverOk(consumerTag)
}
}