override def run()

in gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala [75:131]


  override def run() {
    implicit val config = GerritEndpointConfig(
      gerritConfig.getListenUrl(),
      prefix = Option(projectControl).map(_.getProject.getName),
      "",
      elasticIndex,
      beginDate,
      endDate,
      aggregate,
      emailAlias,
      ignoreSSLCert = Some(ignoreSSLCert)
    )

    implicit val spark: SparkSession = SparkSession
      .builder()
      .appName("analytics-etl")
      .master("local")
      .config("key", "value")
      .getOrCreate()

    implicit lazy val sc: SparkContext = spark.sparkContext
    implicit lazy val sql: SQLContext  = spark.sqlContext

    val prevClassLoader = Thread.currentThread().getContextClassLoader
    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
    try {
      stdout.println(s"Starting new Spark job with parameters: $config")
      stdout.flush()
      val startTs      = System.currentTimeMillis
      val projectStats = buildProjectStats().cache()
      val numRows      = projectStats.count()

      config.elasticIndex.foreach { esIndex =>
        stdout.println(
          s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'"
        )
        stdout.flush()
        import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
        import scala.concurrent.ExecutionContext.Implicits.global
        projectStats
          .saveToEsWithAliasSwap(esIndex, indexType)
          .futureAction
          .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
          .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
      }

      val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
      stdout.println(s"Job COMPLETED in $elaspsedTs secs")
    } catch {
      case e: Throwable =>
        e.printStackTrace()
        stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
        die(e)
    } finally {
      Thread.currentThread().setContextClassLoader(prevClassLoader)
    }
  }