in gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala [75:131]
override def run() {
implicit val config = GerritEndpointConfig(
gerritConfig.getListenUrl(),
prefix = Option(projectControl).map(_.getProject.getName),
"",
elasticIndex,
beginDate,
endDate,
aggregate,
emailAlias,
ignoreSSLCert = Some(ignoreSSLCert)
)
implicit val spark: SparkSession = SparkSession
.builder()
.appName("analytics-etl")
.master("local")
.config("key", "value")
.getOrCreate()
implicit lazy val sc: SparkContext = spark.sparkContext
implicit lazy val sql: SQLContext = spark.sqlContext
val prevClassLoader = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
try {
stdout.println(s"Starting new Spark job with parameters: $config")
stdout.flush()
val startTs = System.currentTimeMillis
val projectStats = buildProjectStats().cache()
val numRows = projectStats.count()
config.elasticIndex.foreach { esIndex =>
stdout.println(
s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'"
)
stdout.flush()
import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
import scala.concurrent.ExecutionContext.Implicits.global
projectStats
.saveToEsWithAliasSwap(esIndex, indexType)
.futureAction
.map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
.recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
}
val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
stdout.println(s"Job COMPLETED in $elaspsedTs secs")
} catch {
case e: Throwable =>
e.printStackTrace()
stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
die(e)
} finally {
Thread.currentThread().setContextClassLoader(prevClassLoader)
}
}