in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala [80:131]
override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
execute()
} else {
val livyServiceUGI = UserGroupInformation.getCurrentUser
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
error("Error running hive query: ", e)
}
}
}
try {
livyServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
error("Error running hive query as user : " +
livyServiceUGI.getShortUserName, e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle = sessionManager.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
error(s"Error executing query in background", e)
setState(OperationState.ERROR)
throw e
}
}
}