def main()

in online_archive/src/main/scala/Main.scala [71:161]


  def main(args:Array[String]):Unit = {
    implicit lazy val archivedRecordDAO = new ArchivedRecordDAO(db)
    implicit lazy val failureRecordDAO = new FailureRecordDAO(db)
    implicit lazy val ignoredRecordDAO = new IgnoredRecordDAO(db)
    implicit lazy val archiveHunterCommunicator = new ArchiveHunterCommunicator(archiveHunterConfig)
    implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)

    implicit lazy val uploader = FileUploader.createFromEnvVars("ARCHIVE_MEDIA_BUCKET") match {
      case Left(err)=>
        logger.error(s"Could not initialise FileUploader: $err")
        Await.ready(actorSystem.terminate(), 30.seconds)
        sys.exit(1)
      case Right(u)=>u
    }

    lazy val proxyUploader = FileUploader.createFromEnvVars("ARCHIVE_PROXY_BUCKET") match {
      case Left(err)=>
        logger.error(s"Could not initialise ProxyFileUploader: $err")
        Await.ready(actorSystem.terminate(), 30.seconds)
        sys.exit(1)
      case Right(u)=>u
    }

    lazy implicit val vidispineFunctions = new VidispineFunctions(uploader, proxyUploader)
    val deliverablesConfig = PlutoDeliverablesConfig()

    val config = Seq(
      ProcessorConfiguration(
        "assetsweeper",
        Seq("assetsweeper.asset_folder_importer.file.new","assetsweeper.asset_folder_importer.file.update", "assetsweeper.replay.file"),
        Seq("storagetier.onlinearchive.newfile","storagetier.onlinearchive.newfile","storagetier.onlinearchive.replay"),
        new AssetSweeperMessageProcessor(plutoConfig)
      ),
      ProcessorConfiguration(
        "vidispine-events",
        Seq("vidispine.job.raw_import.stop", "vidispine.job.essence_version.stop", "vidispine.item.shape.modify", "vidispine.item.metadata.modify", "vidispine.itemneedsarchive.nearline", "vidispine.itemneedsarchive.online"),
        Seq("storagetier.onlinearchive.newfile", "storagetier.onlinearchive.newfile", "storagetier.onlinearchive.vidispineupdate", "storagetier.onlinearchive.vidispineupdate", "storagetier.onlinearchive.newfile.nearline", "storagetier.onlinearchive.newfile.online"),
        new VidispineMessageProcessor(plutoConfig, deliverablesConfig)
      ),
      ProcessorConfiguration(
        OUTPUT_EXCHANGE_NAME,
        Seq("storagetier.onlinearchive.newfile.success","storagetier.onlinearchive.request.*","storagetier.onlinearchive.replay.success","storagetier.onlinearchive.newfile.nearline.success","storagetier.onlinearchive.newfile.online.success"),
        Seq("storagetier.onlinearchive.mediaingest","storagetier.onlinearchive.requested","storagetier.onlinearchive.replayed","storagetier.onlinearchive.mediaingest.nearline","storagetier.onlinearchive.mediaingest.online"),
        new OwnMessageProcessor()
      )
    )

    MessageProcessingFramework(
      "storagetier-online-archive",
      OUTPUT_EXCHANGE_NAME,
      "pluto.storagetier.online-archive",
      "storagetier-online-archive-retry",
      "storagetier-online-archive-fail",
      "storagetier-online-archive-dlq",
      config
    ) match {
      case Left(err) =>
        logger.error(s"Could not initiate message processing framework: $err")
        actorSystem.terminate()
      case Right(framework) =>
        //install a signal handler to terminate cleanly on INT (keyboard interrupt) and TERM (Kubernetes pod shutdown)
        val terminationHandler = new SignalHandler {
          override def handle(signal: Signal): Unit = {
            logger.info(s"Caught signal $signal, terminating")
            framework.terminate()
          }
        }
        Signal.handle(new Signal("INT"), terminationHandler)
        Signal.handle(new Signal("HUP"), terminationHandler)
        Signal.handle(new Signal("TERM"), terminationHandler)

        //first initialise all the tables that we need, then run the framework.
        //add in more table initialises as required
        Future.sequence(Seq(
          ignoredRecordDAO.initialiseSchema,
          archivedRecordDAO.initialiseSchema,
          failureRecordDAO.initialiseSchema,
        ))
          .flatMap(_=>framework.run())
          .onComplete({
            case Success(_) =>
              logger.info(s"framework run completed")
              Await.ready(actorSystem.terminate(), 10.minutes)
              sys.exit(0)
            case Failure(err) =>
              logger.error(s"framework run failed: ${err.getMessage}", err)
              Await.ready(actorSystem.terminate(), 10.minutes)
              sys.exit(1)
          })
    }
  }