in online_archive/src/main/scala/Main.scala [71:161]
def main(args:Array[String]):Unit = {
implicit lazy val archivedRecordDAO = new ArchivedRecordDAO(db)
implicit lazy val failureRecordDAO = new FailureRecordDAO(db)
implicit lazy val ignoredRecordDAO = new IgnoredRecordDAO(db)
implicit lazy val archiveHunterCommunicator = new ArchiveHunterCommunicator(archiveHunterConfig)
implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
implicit lazy val uploader = FileUploader.createFromEnvVars("ARCHIVE_MEDIA_BUCKET") match {
case Left(err)=>
logger.error(s"Could not initialise FileUploader: $err")
Await.ready(actorSystem.terminate(), 30.seconds)
sys.exit(1)
case Right(u)=>u
}
lazy val proxyUploader = FileUploader.createFromEnvVars("ARCHIVE_PROXY_BUCKET") match {
case Left(err)=>
logger.error(s"Could not initialise ProxyFileUploader: $err")
Await.ready(actorSystem.terminate(), 30.seconds)
sys.exit(1)
case Right(u)=>u
}
lazy implicit val vidispineFunctions = new VidispineFunctions(uploader, proxyUploader)
val deliverablesConfig = PlutoDeliverablesConfig()
val config = Seq(
ProcessorConfiguration(
"assetsweeper",
Seq("assetsweeper.asset_folder_importer.file.new","assetsweeper.asset_folder_importer.file.update", "assetsweeper.replay.file"),
Seq("storagetier.onlinearchive.newfile","storagetier.onlinearchive.newfile","storagetier.onlinearchive.replay"),
new AssetSweeperMessageProcessor(plutoConfig)
),
ProcessorConfiguration(
"vidispine-events",
Seq("vidispine.job.raw_import.stop", "vidispine.job.essence_version.stop", "vidispine.item.shape.modify", "vidispine.item.metadata.modify", "vidispine.itemneedsarchive.nearline", "vidispine.itemneedsarchive.online"),
Seq("storagetier.onlinearchive.newfile", "storagetier.onlinearchive.newfile", "storagetier.onlinearchive.vidispineupdate", "storagetier.onlinearchive.vidispineupdate", "storagetier.onlinearchive.newfile.nearline", "storagetier.onlinearchive.newfile.online"),
new VidispineMessageProcessor(plutoConfig, deliverablesConfig)
),
ProcessorConfiguration(
OUTPUT_EXCHANGE_NAME,
Seq("storagetier.onlinearchive.newfile.success","storagetier.onlinearchive.request.*","storagetier.onlinearchive.replay.success","storagetier.onlinearchive.newfile.nearline.success","storagetier.onlinearchive.newfile.online.success"),
Seq("storagetier.onlinearchive.mediaingest","storagetier.onlinearchive.requested","storagetier.onlinearchive.replayed","storagetier.onlinearchive.mediaingest.nearline","storagetier.onlinearchive.mediaingest.online"),
new OwnMessageProcessor()
)
)
MessageProcessingFramework(
"storagetier-online-archive",
OUTPUT_EXCHANGE_NAME,
"pluto.storagetier.online-archive",
"storagetier-online-archive-retry",
"storagetier-online-archive-fail",
"storagetier-online-archive-dlq",
config
) match {
case Left(err) =>
logger.error(s"Could not initiate message processing framework: $err")
actorSystem.terminate()
case Right(framework) =>
//install a signal handler to terminate cleanly on INT (keyboard interrupt) and TERM (Kubernetes pod shutdown)
val terminationHandler = new SignalHandler {
override def handle(signal: Signal): Unit = {
logger.info(s"Caught signal $signal, terminating")
framework.terminate()
}
}
Signal.handle(new Signal("INT"), terminationHandler)
Signal.handle(new Signal("HUP"), terminationHandler)
Signal.handle(new Signal("TERM"), terminationHandler)
//first initialise all the tables that we need, then run the framework.
//add in more table initialises as required
Future.sequence(Seq(
ignoredRecordDAO.initialiseSchema,
archivedRecordDAO.initialiseSchema,
failureRecordDAO.initialiseSchema,
))
.flatMap(_=>framework.run())
.onComplete({
case Success(_) =>
logger.info(s"framework run completed")
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(0)
case Failure(err) =>
logger.error(s"framework run failed: ${err.getMessage}", err)
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(1)
})
}
}