def main()

in online_nearline/src/main/scala/Main.scala [62:147]


  def main(args:Array[String]):Unit = {
    implicit lazy val nearlineRecordDAO = new NearlineRecordDAO(db)
    implicit lazy val failureRecordDAO = new FailureRecordDAO(db)
    implicit val matrixStore = new MXSConnectionBuilderImpl(
      hosts = matrixStoreConfig.hosts,
      accessKeyId = matrixStoreConfig.accessKeyId,
      accessKeySecret = matrixStoreConfig.accessKeySecret,
      clusterId = matrixStoreConfig.clusterId,
      maxIdleSeconds = connectionIdleTime
    )
    val assetFolderLookup = new AssetFolderLookup(plutoConfig)
    implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
    implicit lazy val checksumChecker = new ChecksumChecker()
    implicit lazy val fileCopier = new FileCopier(checksumChecker)

    val config = Seq(
      ProcessorConfiguration(
        "assetsweeper",
        "assetsweeper.asset_folder_importer.file.#",
        "storagetier.nearline.newfile",
        new AssetSweeperMessageProcessor()
      ),
      ProcessorConfiguration(
        OUTPUT_EXCHANGE_NAME,
        Seq("storagetier.nearline.newfile.success", "storagetier.nearline.metadata.success", "storagetier.nearline.internalarchive.required", "storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
        Seq("storagetier.nearline.metadata", "storagetier.nearline.vsupdate", "storagetier.nearline.internalarchive", "storagetier.nearline.internalarchive.nearline", "storagetier.nearline.internalarchive.online"),
        new OwnMessageProcessor(matrixStoreConfig, assetFolderLookup, OUTPUT_EXCHANGE_NAME)
      ),
      ProcessorConfiguration(
        "vidispine-events",
        Seq("vidispine.job.raw_import.stop", "vidispine.job.essence_version.stop", "vidispine.item.metadata.modify", "vidispine.item.shape.modify", "vidispine.itemneedsbackup"),
        Seq("storagetier.nearline.newfile","storagetier.nearline.newfile", "storagetier.nearline.vidispineupdate", "storagetier.nearline.vidispineupdate", "storagetier.nearline.newfile"),
        new VidispineMessageProcessor()
      ),
      ProcessorConfiguration(
        "storagetier-media-remover",
        Seq("storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
        Seq("storagetier.nearline.internalarchive.required.nearline", "storagetier.nearline.internalarchive.required.online"),
        new MediaRemoverMessageProcessor()
      )
    )

    MessageProcessingFramework(
      "storagetier-online-nearline",
      OUTPUT_EXCHANGE_NAME,
      "pluto.storagetier.online-nearline",
      "storagetier-online-nearline-retry",
      "storagetier-online-nearline-fail",
      "storagetier-online-nearline-dlq",
      config,
      maximumRetryLimit = retryLimit
    ) match {
      case Left(err) =>
        logger.error(s"Could not initiate message processing framework: $err")
        actorSystem.terminate()
      case Right(framework) =>
        //install a signal handler to terminate cleanly on INT (keyboard interrupt) and TERM (Kubernetes pod shutdown)
        val terminationHandler = new SignalHandler {
          override def handle(signal: Signal): Unit = {
            logger.info(s"Caught signal $signal, terminating")
            framework.terminate()
          }
        }
        Signal.handle(new Signal("INT"), terminationHandler)
        Signal.handle(new Signal("HUP"), terminationHandler)
        Signal.handle(new Signal("TERM"), terminationHandler)

        //first initialise all the tables that we need, then run the framework.
        //add in more table initialises as required
        Future.sequence(Seq(
          nearlineRecordDAO.initialiseSchema,
          failureRecordDAO.initialiseSchema,
        ))
          .flatMap(_=>framework.run())
          .onComplete({
            case Success(_) =>
              logger.info(s"framework run completed")
              Await.ready(actorSystem.terminate(), 10.minutes)
              sys.exit(0)
            case Failure(err) =>
              logger.error(s"framework run failed: ${err.getMessage}", err)
              Await.ready(actorSystem.terminate(), 10.minutes)
              sys.exit(1)
          })
    }
  }