in online_nearline/src/main/scala/Main.scala [62:147]
def main(args:Array[String]):Unit = {
implicit lazy val nearlineRecordDAO = new NearlineRecordDAO(db)
implicit lazy val failureRecordDAO = new FailureRecordDAO(db)
implicit val matrixStore = new MXSConnectionBuilderImpl(
hosts = matrixStoreConfig.hosts,
accessKeyId = matrixStoreConfig.accessKeyId,
accessKeySecret = matrixStoreConfig.accessKeySecret,
clusterId = matrixStoreConfig.clusterId,
maxIdleSeconds = connectionIdleTime
)
val assetFolderLookup = new AssetFolderLookup(plutoConfig)
implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
implicit lazy val checksumChecker = new ChecksumChecker()
implicit lazy val fileCopier = new FileCopier(checksumChecker)
val config = Seq(
ProcessorConfiguration(
"assetsweeper",
"assetsweeper.asset_folder_importer.file.#",
"storagetier.nearline.newfile",
new AssetSweeperMessageProcessor()
),
ProcessorConfiguration(
OUTPUT_EXCHANGE_NAME,
Seq("storagetier.nearline.newfile.success", "storagetier.nearline.metadata.success", "storagetier.nearline.internalarchive.required", "storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
Seq("storagetier.nearline.metadata", "storagetier.nearline.vsupdate", "storagetier.nearline.internalarchive", "storagetier.nearline.internalarchive.nearline", "storagetier.nearline.internalarchive.online"),
new OwnMessageProcessor(matrixStoreConfig, assetFolderLookup, OUTPUT_EXCHANGE_NAME)
),
ProcessorConfiguration(
"vidispine-events",
Seq("vidispine.job.raw_import.stop", "vidispine.job.essence_version.stop", "vidispine.item.metadata.modify", "vidispine.item.shape.modify", "vidispine.itemneedsbackup"),
Seq("storagetier.nearline.newfile","storagetier.nearline.newfile", "storagetier.nearline.vidispineupdate", "storagetier.nearline.vidispineupdate", "storagetier.nearline.newfile"),
new VidispineMessageProcessor()
),
ProcessorConfiguration(
"storagetier-media-remover",
Seq("storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
Seq("storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
new MediaRemoverMessageProcessor()
)
)
MessageProcessingFramework(
"storagetier-online-nearline",
OUTPUT_EXCHANGE_NAME,
"pluto.storagetier.online-nearline",
"storagetier-online-nearline-retry",
"storagetier-online-nearline-fail",
"storagetier-online-nearline-dlq",
config,
maximumRetryLimit = retryLimit
) match {
case Left(err) =>
logger.error(s"Could not initiate message processing framework: $err")
actorSystem.terminate()
case Right(framework) =>
//install a signal handler to terminate cleanly on INT (keyboard interrupt) and TERM (Kubernetes pod shutdown)
val terminationHandler = new SignalHandler {
override def handle(signal: Signal): Unit = {
logger.info(s"Caught signal $signal, terminating")
framework.terminate()
}
}
Signal.handle(new Signal("INT"), terminationHandler)
Signal.handle(new Signal("HUP"), terminationHandler)
Signal.handle(new Signal("TERM"), terminationHandler)
//first initialise all the tables that we need, then run the framework.
//add in more table initialises as required
Future.sequence(Seq(
nearlineRecordDAO.initialiseSchema,
failureRecordDAO.initialiseSchema,
))
.flatMap(_=>framework.run())
.onComplete({
case Success(_) =>
logger.info(s"framework run completed")
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(0)
case Failure(err) =>
logger.error(s"framework run failed: ${err.getMessage}", err)
Await.ready(actorSystem.terminate(), 10.minutes)
sys.exit(1)
})
}
}