private def executeHQL()

in linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala [240:361]


  private def executeHQL(realCode: String, driver: HiveDriverProxy): ExecuteResponse = {
    var needRetry: Boolean = true
    var tryCount: Int = 0
    var hasResult: Boolean = false
    var rows: Int = 0
    var columnCount: Int = 0
    while (needRetry) {
      needRetry = false
      driver.setTryCount(tryCount + 1)
      val startTime = System.currentTimeMillis()
      try {
        val hiveResponse: CommandProcessorResponse =
          if (!HiveDriverProxy.isIDriver(driver.getDriver())) {
            var compileRet = -1
            Utils.tryCatch {
              compileRet = driver.compile(realCode)
              logger.info(
                s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}"
              )
              if (0 != compileRet) {
                logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}")
                throw HiveQueryFailedException(
                  COMPILE_HIVE_QUERY_ERROR.getErrorCode,
                  COMPILE_HIVE_QUERY_ERROR.getErrorDesc
                )
              }
              val queryPlan = driver.getPlan()
              val numberOfJobs = Utilities.getMRTasks(queryPlan.getRootTasks).size
              numberOfMRJobs = numberOfJobs
              logger.info(s"there are ${numberOfMRJobs} jobs.")
            } {
              case e: Exception => logger.warn("obtain hive execute query plan failed,", e)
              case t: Throwable => logger.warn("obtain hive execute query plan failed,", t)
            }
            if (numberOfMRJobs > 0) {
              engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
            }
            if (thread.isInterrupted) {
              logger.error(
                "The thread of execution has been interrupted and the task should be terminated"
              )
              return ErrorExecuteResponse(
                "The thread of execution has been interrupted and the task should be terminated",
                null
              )
            }
            driver.run(realCode, compileRet == 0)
          } else {
            driver.run(realCode)
          }
        if (hiveResponse.getResponseCode != 0) {
          LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
          return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
        }
        engineExecutorContext.appendStdout(
          s"Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
        )
        LOG.info(
          s"$getId >> Time taken: ${ByteTimeUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results."
        )

        val fieldSchemas =
          if (hiveResponse.getSchema != null) hiveResponse.getSchema.getFieldSchemas
          else if (driver.getSchema != null) {
            driver.getSchema.getFieldSchemas
          } else {
            throw HiveQueryFailedException(
              GET_FIELD_SCHEMAS_ERROR.getErrorCode,
              GET_FIELD_SCHEMAS_ERROR.getErrorDesc
            )
          }
        LOG.debug("fieldSchemas are " + fieldSchemas)
        if (fieldSchemas == null || isNoResultSql(realCode)) {
          // IOUtils.closeQuietly(resultSetWriter)
          numberOfMRJobs = -1
          singleCodeCompleted.set(true)
          onComplete()
          singleSqlProgressMap.clear()
          return SuccessExecuteResponse()
        }
        // get column data
        val metaData: TableMetaData =
          getResultMetaData(fieldSchemas, engineExecutorContext.getEnableResultsetMetaWithTableName)
        // send result
        rows = sendResultSet(engineExecutorContext, driver, metaData)
        columnCount = if (fieldSchemas != null) fieldSchemas.size() else 0
        hasResult = true
      } catch {
        case e if HiveDriverProxy.isCommandNeedRetryException(e) =>
          tryCount += 1
          needRetry = true
          HiveProgressHelper.clearHiveProgress()
          onComplete()
          singleSqlProgressMap.clear()
          clearCurrentProgress()
          HiveProgressHelper.storeSingleSQLProgress(0.0f)
          LOG.warn("Retry hive query with a different approach...")
        case t: Throwable =>
          LOG.error(s"query failed, reason : ", t)
          HiveProgressHelper.clearHiveProgress()
          clearCurrentProgress()
          HiveProgressHelper.storeSingleSQLProgress(0.0f)
          singleCodeCompleted.set(true)
          numberOfMRJobs = -1
          onComplete()
          singleSqlProgressMap.clear()
          return ErrorExecuteResponse(t.getMessage, t)
      }
    }
    if (hasResult) {
      engineExecutorContext.appendStdout(s"Fetched  $columnCount col(s) : $rows row(s) in hive")
      LOG.info(s"$getId >> Fetched  $columnCount col(s) : $rows row(s) in hive")
    }
    clearCurrentProgress()
    HiveProgressHelper.clearHiveProgress()
    HiveProgressHelper.storeSingleSQLProgress(0.0f)
    singleCodeCompleted.set(true)
    numberOfMRJobs = -1
    onComplete()
    singleSqlProgressMap.clear()
    SuccessExecuteResponse()
  }