in linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala [240:361]
private def executeHQL(realCode: String, driver: HiveDriverProxy): ExecuteResponse = {
var needRetry: Boolean = true
var tryCount: Int = 0
var hasResult: Boolean = false
var rows: Int = 0
var columnCount: Int = 0
while (needRetry) {
needRetry = false
driver.setTryCount(tryCount + 1)
val startTime = System.currentTimeMillis()
try {
val hiveResponse: CommandProcessorResponse =
if (!HiveDriverProxy.isIDriver(driver.getDriver())) {
var compileRet = -1
Utils.tryCatch {
compileRet = driver.compile(realCode)
logger.info(
s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}"
)
if (0 != compileRet) {
logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}")
throw HiveQueryFailedException(
COMPILE_HIVE_QUERY_ERROR.getErrorCode,
COMPILE_HIVE_QUERY_ERROR.getErrorDesc
)
}
val queryPlan = driver.getPlan()
val numberOfJobs = Utilities.getMRTasks(queryPlan.getRootTasks).size
numberOfMRJobs = numberOfJobs
logger.info(s"there are ${numberOfMRJobs} jobs.")
} {
case e: Exception => logger.warn("obtain hive execute query plan failed,", e)
case t: Throwable => logger.warn("obtain hive execute query plan failed,", t)
}
if (numberOfMRJobs > 0) {
engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
}
if (thread.isInterrupted) {
logger.error(
"The thread of execution has been interrupted and the task should be terminated"
)
return ErrorExecuteResponse(
"The thread of execution has been interrupted and the task should be terminated",
null
)
}
driver.run(realCode, compileRet == 0)
} else {
driver.run(realCode)
}
if (hiveResponse.getResponseCode != 0) {
LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
}
engineExecutorContext.appendStdout(
s"Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
)
LOG.info(
s"$getId >> Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
)
val fieldSchemas =
if (hiveResponse.getSchema != null) hiveResponse.getSchema.getFieldSchemas
else if (driver.getSchema != null) {
driver.getSchema.getFieldSchemas
} else {
throw HiveQueryFailedException(
GET_FIELD_SCHEMAS_ERROR.getErrorCode,
GET_FIELD_SCHEMAS_ERROR.getErrorDesc
)
}
LOG.debug("fieldSchemas are " + fieldSchemas)
if (fieldSchemas == null || isNoResultSql(realCode)) {
// IOUtils.closeQuietly(resultSetWriter)
numberOfMRJobs = -1
singleCodeCompleted.set(true)
onComplete()
singleSqlProgressMap.clear()
return SuccessExecuteResponse()
}
// get column data
val metaData: TableMetaData =
getResultMetaData(fieldSchemas, engineExecutorContext.getEnableResultsetMetaWithTableName)
// send result
rows = sendResultSet(engineExecutorContext, driver, metaData)
columnCount = if (fieldSchemas != null) fieldSchemas.size() else 0
hasResult = true
} catch {
case e if HiveDriverProxy.isCommandNeedRetryException(e) =>
tryCount += 1
needRetry = true
HiveProgressHelper.clearHiveProgress()
onComplete()
singleSqlProgressMap.clear()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
LOG.warn("Retry hive query with a different approach...")
case t: Throwable =>
LOG.error(s"query failed, reason : ", t)
HiveProgressHelper.clearHiveProgress()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
onComplete()
singleSqlProgressMap.clear()
return ErrorExecuteResponse(t.getMessage, t)
}
}
if (hasResult) {
engineExecutorContext.appendStdout(s"Fetched $columnCount col(s) : $rows row(s) in hive")
LOG.info(s"$getId >> Fetched $columnCount col(s) : $rows row(s) in hive")
}
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
onComplete()
singleSqlProgressMap.clear()
SuccessExecuteResponse()
}