app/services/BigQueryService.scala (88 lines of code) (raw):
package services
import com.google.api.gax.core.FixedCredentialsProvider
import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.RetryOption
import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, FieldValueList, JobInfo, QueryJobConfiguration, TableResult}
import com.typesafe.scalalogging.LazyLogging
import models.BigQueryResult
import zio.blocking.effectBlocking
import zio.{ZEnv, ZIO}
import java.io.ByteArrayInputStream
import scala.jdk.CollectionConverters._
case class BigQueryError(message: String) extends Throwable
class BigQueryService(bigQuery: BigQuery) extends LazyLogging {
def buildQuery(testName: String, channel: String, stage: String): String = {
val channelInQuery = channel match {
case "Epic" => "ACQUISITIONS_EPIC"
case "Banner1" => "ACQUISITIONS_ENGAGEMENT_BANNER"
case "Banner2" => "ACQUISITIONS_SUBSCRIPTIONS_BANNER"
}
s"""SELECT
ab.name AS test_name,
ab.variant AS variant_name,
component_type,
SUM(acquisition_ltv_3_year) AS ltv3,
FROM `datatech-platform-prod.reader_revenue.fact_holding_acquisition`
CROSS JOIN UNNEST(ab_tests) AS ab
WHERE ab.name= '$testName'
AND component_type ='$channelInQuery'
GROUP BY 1,2,3
"""
}
def runQuery(queryString: String): ZIO[ZEnv, BigQueryError, TableResult] =
effectBlocking {
val queryConfig = QueryJobConfiguration
.newBuilder(queryString)
.setUseLegacySql(false)
.build()
var queryJob = bigQuery.create(JobInfo.of(queryConfig))
queryJob = queryJob.waitFor(RetryOption.maxAttempts(0))
Option(queryJob)
}.flatMap {
case None => ZIO.fail(BigQueryError("Cannot create query job"))
case Some(job) =>
Option(job.getStatus.getError) match {
case None => ZIO.succeed(
job.getQueryResults()
)
case Some(error) => ZIO.fail(BigQueryError("Cannot retrieve results"))
}
}.mapError(error => {
logger.error(s"Error running query: $error")
BigQueryError(error.toString)
})
def toBigQueryResult(row: FieldValueList): BigQueryResult = {
val bigQueryResult = BigQueryResult(
row.get("test_name").getStringValue,
row.get("variant_name").getStringValue,
row.get("component_type").getStringValue,
row.get("ltv3").getDoubleValue
)
logger.debug(bigQueryResult.toString)
bigQueryResult
}
def getBigQueryResult(result: TableResult): List[BigQueryResult] = {
result.getValues.asScala.map(toBigQueryResult).toList
}
def getLTV3Data(testName: String, channel: String, stage: String): ZIO[ZEnv, BigQueryError, List[BigQueryResult]] = {
val query = buildQuery(testName, channel, stage)
logger.info(s"Query: $query");
runQuery(query).map(result => getBigQueryResult(result))
}
}
object BigQueryService {
def apply(stage: String, jsonCredentials: String): BigQueryService = {
val wifCredentialsConfig = GoogleCredentials.fromStream(
new ByteArrayInputStream(jsonCredentials.getBytes())
)
val credentials = FixedCredentialsProvider.create(wifCredentialsConfig).getCredentials
val projectId = s"datatech-platform-${stage.toLowerCase}"
val bigQuery = BigQueryOptions
.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.build()
.getService
new BigQueryService(bigQuery)
}
}