in tools/bigquery-hive-external-table-loader/src/main/scala/com/google/cloud/bqhiveloader/SparkJobs.scala [158:439]
def loadPartitions(c: Config, table: TableMetadata, partitions: Seq[Partition]): Unit = {
for {
keytab <- c.krbKeyTab
principal <- c.krbPrincipal
} yield {
Kerberos.configureJaas(keytab, principal)
}
val writeKeyPath = c.bqWriteKeyFile.orElse(c.bqKeyFile)
val createKeyPath = c.bqCreateTableKeyFile.orElse(c.bqKeyFile)
val gcsKeyPath = c.gcsKeyFile.orElse(c.bqKeyFile)
val writeKey = readLocalOrSparkFile(writeKeyPath)
val createKey = readLocalOrSparkFile(createKeyPath)
val gcsKey = readLocalOrSparkFile(gcsKeyPath)
writeKeyPath.foreach{p =>
require(writeKey.isDefined, s"unable to load BigQuery write key from $p")
logger.info(s"loaded BigQuery write key from $p")
}
createKeyPath.foreach{p =>
require(createKey.isDefined, s"unable to load BigQuery create key from $p")
logger.info(s"loaded BigQuery create key from $p")
}
gcsKeyPath.foreach{p =>
require(gcsKey.isDefined, s"unable to load GCS key from $p")
logger.info(s"loaded GCS key from $p")
}
val bqCreateCredentials: GoogleCredentials =
createKey match {
case Some(bytes) =>
GoogleCredentials
.fromStream(new ByteArrayInputStream(bytes))
.createScoped(BigQueryScope)
case _ =>
GoogleCredentials.getApplicationDefault
}
val bqWriteCredentials: GoogleCredentials =
writeKey match {
case Some(bytes) =>
GoogleCredentials
.fromStream(new ByteArrayInputStream(bytes))
.createScoped(BigQueryScope)
case _ =>
GoogleCredentials.getApplicationDefault
}
val storageCreds: GoogleCredentials =
gcsKey match {
case Some(bytes) =>
GoogleCredentials
.fromStream(new ByteArrayInputStream(bytes))
.createScoped(StorageScope)
case _ =>
GoogleCredentials.getApplicationDefault
}
val retrySettings = RetrySettings.newBuilder()
.setMaxAttempts(0)
.setTotalTimeout(Duration.ofHours(24))
.setInitialRetryDelay(Duration.ofSeconds(30))
.setRetryDelayMultiplier(2.0d)
.setMaxRetryDelay(Duration.ofSeconds(300))
.setInitialRpcTimeout(Duration.ofHours(8))
.setRpcTimeoutMultiplier(1.0d)
.setMaxRpcTimeout(Duration.ofHours(8))
.setJittered(false)
.build()
val bigqueryCreate: BigQuery = BigQueryOptions.newBuilder()
.setLocation(c.bqLocation)
.setCredentials(bqCreateCredentials)
.setProjectId(c.bqProject)
.setHeaderProvider(FixedHeaderProvider.create("user-agent", BQHiveLoader.UserAgent))
.setRetrySettings(retrySettings)
.build()
.getService
val bigqueryWrite: BigQuery = BigQueryOptions.newBuilder()
.setLocation(c.bqLocation)
.setCredentials(bqWriteCredentials)
.setProjectId(c.bqProject)
.setHeaderProvider(FixedHeaderProvider.create("user-agent", BQHiveLoader.UserAgent))
.setRetrySettings(retrySettings)
.build()
.getService
val bql: com.google.api.services.bigquery.Bigquery = RangePartitioningUtil.bq(bqCreateCredentials)
val gcs: Storage = StorageOptions.newBuilder()
.setCredentials(storageCreds)
.setProjectId(c.bqProject)
.setHeaderProvider(FixedHeaderProvider.create("user-agent", BQHiveLoader.UserAgent))
.setRetrySettings(retrySettings)
.build()
.getService
// TODO detect from metadata
val storageFormat = c.hiveStorageFormat
.map(ExternalTableManager.parseStorageFormat)
.getOrElse(Orc)
val filteredSchema = StructType(table.schema
.filterNot{x => c.dropColumns.contains(x.name)}
.filter{x => c.keepColumns.contains(x.name) || c.keepColumns.isEmpty}
.map{x =>
c.renameColumns.find(_._1 == x.name) match {
case Some((_, newName)) =>
x.copy(name = newName)
case _ =>
x
}
})
/* Create BigQuery table to be loaded */
NativeTableManager.createTableIfNotExists(c.bqProject, c.bqDataset, c.bqTable, c, filteredSchema, bigqueryWrite, bql)
val externalTables: Seq[(Partition,TableInfo, Boolean)] = partitions.map{part =>
val extTable = createExternalTable(c.bqProject, c.tempDataset, c.bqTable,
table, part, storageFormat, bigqueryCreate, gcs, c.dryRun)
val renameOrcCols = if (!c.dryRun) {
val creation = waitForCreation(extTable.getTableId, timeoutMillis = 120000L, bigqueryCreate)
hasOrcPositionalColNames(creation)
} else true
(part, extTable, renameOrcCols)
}
/* We generate SQL for each of the external tables
* Each one is a distinct query because partition values
* may be selected as a literal value specific to that partition
*/
val sql: Seq[String] = externalTables.map{x =>
val partition = x._1
val extTableId = x._2.getTableId
val schema = table.schema
val renameOrcCols = x._3
SQLGenerator.generateSelectFromExternalTable(
extTable = extTableId,
schema = schema,
partition = partition,
unusedColumnName = c.unusedColumnName,
formats = c.partColFormats.toMap,
renameOrcCols = renameOrcCols,
dropColumns = c.dropColumns,
keepColumns = c.keepColumns)
}
/* Our load operation needs to be atomic to prevent users from
* receiving partial results.
* The simplest way to achieve this is to union all the queries.
*/
val unionSQL = sql.mkString("\nUNION ALL\n")
logger.info(s"Generated SQL with length ${unionSQL.length}")
if (unionSQL.length < MaxSQLLength) {
/* Only proceed with this method if we are under the 1MB SQL limit. */
logger.debug("Submitting Query:\n" + unionSQL)
val destTableId = TableId.of(c.bqProject, c.bqDataset, c.bqTable)
val queryJob = ExternalTableManager.runQuery(unionSQL, destTableId, c.bqProject,
c.bqLocation, c.dryRun, c.bqOverwrite, c.bqBatch, bigqueryWrite)
logger.info(s"Waiting for Job")
scala.Option(queryJob.waitFor(RetryOption.totalTimeout(Duration.ofHours(8)))) match {
case None =>
val msg = s"Job doesn't exist"
logger.error(msg)
throw new RuntimeException(msg)
case Some(j) if j.getStatus.getError != null =>
val msg = s"Job failed with message: " + j.getStatus.getError.getMessage
logger.error(msg)
throw new RuntimeException(msg)
case _ =>
}
logger.info(s"Finished loading partitions")
} else if (c.useTempViews) {
/* If we are over the limit we have one more method to squeeze
* more SQL into a single Query Job by splitting our union all
* into multiple views, which we will then select from.
* This lets us use the 12MB SQL limit for resolved SQL.
*/
logger.info("Creating temporary views")
val t = System.currentTimeMillis() / 1000
val views = SQLGenerator.createViews(sql)
.zipWithIndex
.map { x =>
val (query, i) = x
val viewName = s"vw_${c.bqTable}_${i}_$t"
val tableId = TableId.of(c.bqProject, c.tempDataset, viewName)
bigqueryCreate.create(TableInfo.of(tableId, ViewDefinition.of(query)))
tableId
}
logger.info("Finished creating temporary views")
val viewSQL = SQLGenerator.generateSelectFromViews(views, filteredSchema)
logger.debug("Submitting Query:\n" + viewSQL)
val destTableId = TableId.of(c.bqProject, c.bqDataset, c.bqTable)
val queryJob = ExternalTableManager.runQuery(viewSQL, destTableId, c.bqProject,
c.bqLocation, c.dryRun, c.bqOverwrite, c.bqBatch, bigqueryWrite)
logger.info(s"Waiting for Job")
scala.Option(queryJob.waitFor(RetryOption.totalTimeout(Duration.ofHours(8)))) match {
case None =>
val msg = s"Job doesn't exist"
logger.error(msg)
throw new RuntimeException(msg)
case Some(j) if j.getStatus.getError != null =>
val msg = s"Job failed with message: " + j.getStatus.getError.getMessage
logger.error(msg)
throw new RuntimeException(msg)
case _ =>
}
logger.info(s"Finished loading partitions")
logger.info(s"Deleting temporary views")
views.foreach{x =>
Try(bigqueryCreate.delete(x))
.failed.foreach{y => logger.error(s"Failed to delete ${x.getDataset}.${x.getTable}")}
}
logger.info(s"Finished deleting temporary views")
} else if (c.useTempTable) {
/* It's possible that the table has so many partitions and/or
* columns that the resolved SQL exceeds 12MB.
* We could take the risk of using "SELECT *" instead of naming
* columns, but as currently implemented we don't do that
* and simply fall back to the original method which appends
* each partition into a temp table before running an atomic select
* query job to load the temp table partitions into the destination table.
*/
val tmpTableName = c.bqTable + "_" + c.refreshPartition.getOrElse("") + "_tmp_" + (System.currentTimeMillis()/1000L).toString
logger.info("Loading partitions into temporary table " + tmpTableName)
NativeTableManager.createTableIfNotExists(c.bqProject, c.tempDataset, tmpTableName, c, table.schema, bigqueryWrite, bql, Some(Duration.ofHours(6).toMillis))
logger.info("Loading partitions from external tables")
val tmpTableId = TableId.of(c.bqProject, c.tempDataset, tmpTableName)
val jobs = externalTables.map{ x =>
val partition = x._1
val extTableId = x._2.getTableId
val schema = table.schema
val renameOrcCols = x._3
val loadJob = ExternalTableManager.loadPart(
destTableId = tmpTableId,
schema = schema,
partition = partition,
extTableId = extTableId,
unusedColumnName = c.unusedColumnName,
partColFormats = c.partColFormats.toMap,
bigqueryWrite = bigqueryWrite,
batch = c.bqBatch,
overwrite = c.bqOverwrite,
renameOrcCols = renameOrcCols,
dryRun = c.dryRun,
dropColumns = c.dropColumns,
keepColumns = c.keepColumns,
renameColumns = c.renameColumns.toMap)
logger.info(s"Created QueryJob ${loadJob.getJobId.getJob}")
(partition, loadJob)
}
jobs.foreach{x =>
val (partition,loadJob) = x
scala.Option(loadJob.waitFor(RetryOption.totalTimeout(Duration.ofDays(1)))) match {
case Some(j) if j.getStatus.getError != null =>
val msg = s"Job ${j.getJobId.getJob} failed with message: ${j.getStatus.getError}\nsql:\n$sql\n\npartition:\n$partition\n\nschema:\n${table.schema.map(_.toString).mkString("\n")}"
throw new RuntimeException(msg)
case None =>
val msg = s"Job was not created successfully"
throw new RuntimeException(msg)
case _ =>
}
}
logger.info("Finished loading " + tmpTableName)
NativeTableManager.copyOnto(c.bqProject, c.tempDataset, tmpTableName,
c.bqProject, c.bqDataset, c.bqTable, destPartition = c.refreshPartition,
bq = bigqueryWrite, dryRun = c.dryRun, batch = c.bqBatch)
logger.info(s"Finished loading ${c.bqTable}")
} else {
val msg = s"Generated SQL with length ${unionSQL.length} but useTempTable is set to false. Reduce number of selected partitions or add --useTempTable flag."
throw new RuntimeException(msg)
}
}