in media_remover/src/main/scala/Main.scala [65:156]
def main(args:Array[String]):Unit = {
implicit lazy val pendingDeletionRecordDAO = new PendingDeletionRecordDAO(db)
implicit lazy val nearlineRecordDAO = new NearlineRecordDAO(db)
implicit val matrixStore = new MXSConnectionBuilderImpl(
hosts = matrixStoreConfig.hosts,
accessKeyId = matrixStoreConfig.accessKeyId,
accessKeySecret = matrixStoreConfig.accessKeySecret,
clusterId = matrixStoreConfig.clusterId,
maxIdleSeconds = connectionIdleTime
)
val assetFolderLookup = new AssetFolderLookup(plutoConfig)
implicit lazy val checksumChecker = new ChecksumChecker()
implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
implicit lazy val s3ObjectChecker = S3ObjectChecker.createFromEnvVars("ARCHIVE_MEDIA_BUCKET") match {
case Left(err)=>
logger.error(s"Could not initialise FileUploader: $err")
Await.ready(actorSystem.terminate(), 30.seconds)
sys.exit(1)
case Right(u)=>u
}
implicit lazy val pendingDeletionHelper = new PendingDeletionHelper()
implicit lazy val onlineHelper = new OnlineHelper()
implicit lazy val nearlineHelper = new NearlineHelper(assetFolderLookup)
val config = Seq(
ProcessorConfiguration(
exchangeName = "storagetier-project-restorer",
routingKey = Seq("storagetier.restorer.media_not_required.online", "storagetier.restorer.media_not_required.nearline"),
outputRoutingKey = Seq("storagetier.mediaremover.removedfile.online", "storagetier.mediaremover.removedfile.nearline"),
new MediaNotRequiredMessageProcessor(assetFolderLookup) // may also send "storagetier.nearline.internalarchive.required"
),
ProcessorConfiguration(
exchangeName = "storagetier-online-nearline",
routingKey = Seq("storagetier.nearline.internalarchive.nearline", "storagetier.nearline.internalarchive.online", "storagetier.nearline.newfile.success"),
outputRoutingKey = Seq("storagetier.mediaremover.removedfile.nearline", "storagetier.mediaremover.removedfile.online", "storagetier.mediaremover.removedfile.online"),
new OnlineNearlineMessageProcessor(assetFolderLookup, selfHealRetryLimit) // may also send "storagetier.nearline.internalarchive.required"
),
ProcessorConfiguration(
exchangeName = "storagetier-online-archive",
routingKey = Seq("storagetier.onlinearchive.mediaingest.nearline", "storagetier.onlinearchive.mediaingest.online"),
outputRoutingKey = Seq("storagetier.mediaremover.removedfile.nearline", "storagetier.mediaremover.removedfile.online"),
new OnlineArchiveMessageProcessor(assetFolderLookup, selfHealRetryLimit) // may also send "vidispine.itemneedsarchive.{nearline|online}"
),
)
MessageProcessingFramework(
ingest_queue_name = "storagetier-media-remover",
output_exchange_name = OUTPUT_EXCHANGE_NAME,
routingKeyForSend = "pluto.storagetier.media-remover",
retryExchangeName = "storagetier-media-remover-retry",
failedExchangeName = "storagetier-media-remover-fail",
failedQueueName = "storagetier-media-remover-dlq",
handlers = config,
maximumRetryLimit = retryLimit
) match {
case Left(err) =>
logger.error(s"Could not initiate message processing framework: $err")
actorSystem.terminate()
case Right(framework) =>
//install a signal handler to terminate cleanly on INT (keyboard interrupt) and TERM (Kubernetes pod shutdown)
val terminationHandler = new SignalHandler {
override def handle(signal: Signal): Unit = {
logger.info(s"Caught signal $signal, terminating")
framework.terminate()
}
}
Signal.handle(new Signal("INT"), terminationHandler)
Signal.handle(new Signal("HUP"), terminationHandler)
Signal.handle(new Signal("TERM"), terminationHandler)
//first initialise all the tables that we need, then run the framework.
//add in more table initialises as required
Future.sequence(Seq(
pendingDeletionRecordDAO.initialiseSchema,
))
.flatMap(_=>framework.run())
.onComplete({
case Success(_) =>
logger.info(s"framework run completed")
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(0)
case Failure(err) =>
logger.error(s"framework run failed: ${err.getMessage}", err)
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(1)
})
}
}