private InterpreterResult executeSql()

in jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java [766:918]


  private InterpreterResult executeSql(String sql,
      InterpreterContext context) throws InterpreterException {
    Connection connection = null;
    Statement statement;
    ResultSet resultSet = null;
    String paragraphId = context.getParagraphId();
    String noteId = context.getNoteId();
    String user = getUser(context);

    try {
      connection = getConnection(context);
    } catch (IllegalArgumentException e) {
      LOGGER.error("Cannot run " + sql, e);
      return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration");
    } catch (Exception e) {
      LOGGER.error("Fail to getConnection", e);
      try {
        closeDBPool(user);
      } catch (SQLException e1) {
        LOGGER.error("Cannot close DBPool for user: " + user , e1);
      }
      if (e instanceof SQLException) {
        return new InterpreterResult(Code.ERROR, e.getMessage());
      } else {
        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
      }
    }
    if (connection == null) {
      return new InterpreterResult(Code.ERROR, "User's connection not found.");
    }

    try {
      List<String>  sqlArray = sqlSplitter.splitSql(sql);
      for (String sqlToExecute : sqlArray) {
        String sqlTrimmedLowerCase = sqlToExecute.trim().toLowerCase();
        if (sqlTrimmedLowerCase.startsWith("set ") ||
                sqlTrimmedLowerCase.startsWith("list ") ||
                sqlTrimmedLowerCase.startsWith("add ") ||
                sqlTrimmedLowerCase.startsWith("delete ")) {
          // some version of hive doesn't work with set statement with empty line ahead.
          // so we need to trim it first in this case.
          sqlToExecute = sqlToExecute.trim();
        }
        LOGGER.info("[{}|{}|{}] Execute sql: {}", user, noteId, paragraphId, sqlToExecute);
        statement = connection.createStatement();

        // fetch n+1 rows in order to indicate there's more rows available (for large selects)
        statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult()));
        statement.setMaxRows(context.getIntLocalProperty("limit", maxRows));

        if (statement == null) {
          return new InterpreterResult(Code.ERROR, "Prefix not found.");
        }

        try {
          getJDBCConfiguration(user).saveStatement(paragraphId, statement);

          String statementPrecode =
              getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, DEFAULT_KEY));

          if (StringUtils.isNotBlank(statementPrecode)) {
            statement.execute(statementPrecode);
          }

          // start hive monitor thread if it is hive jdbc
          String jdbcURL = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY);
          String driver =
                  getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY);
          if (jdbcURL != null && driver != null) {
            if (driver.equals("org.apache.hive.jdbc.HiveDriver") &&
                jdbcURL.startsWith("jdbc:hive2://")) {
              HiveUtils.startHiveMonitorThread(statement, context,
                    Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
            } else if (driver.equals("org.apache.kyuubi.jdbc.KyuubiHiveDriver") &&
                (jdbcURL.startsWith("jdbc:kyuubi://") || jdbcURL.startsWith("jdbc:hive2://"))) {
              KyuubiUtils.startMonitorThread(connection, statement, context,
                  Boolean.parseBoolean(getProperty("kyuubi.log.display", "true")), this);
            }
          }
          boolean isResultSetAvailable = statement.execute(sqlToExecute);
          getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful();
          if (isResultSetAvailable) {
            resultSet = statement.getResultSet();

            // Regards that the command is DDL.
            if (isDDLCommand(statement.getUpdateCount(),
                resultSet.getMetaData().getColumnCount())) {
              context.out.write("%text Query executed successfully.\n");
            } else {
              String template = context.getLocalProperties().get("template");
              if (!StringUtils.isBlank(template)) {
                resultSet.next();
                SingleRowInterpreterResult singleRowResult =
                        new SingleRowInterpreterResult(getFirstRow(resultSet), template, context);

                if (isFirstRefreshMap.get(context.getParagraphId())) {
                  context.out.write(singleRowResult.toAngular());
                  context.out.write("\n%text ");
                  context.out.flush();
                  isFirstRefreshMap.put(context.getParagraphId(), false);
                }
                singleRowResult.pushAngularObjects();

              } else {
                String results = getResults(resultSet,
                        !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE));
                context.out.write(results);
                context.out.write("\n%text ");
                context.out.flush();
              }
            }
          } else {
            // Response contains either an update count or there are no results.
            int updateCount = statement.getUpdateCount();
            context.out.write("\n%text " +
                "Query executed successfully. Affected rows : " +
                    updateCount + "\n");
          }
        } finally {
          if (resultSet != null) {
            try {
              resultSet.close();
            } catch (SQLException e) { /*ignored*/ }
          }
          if (statement != null) {
            try {
              statement.close();
            } catch (SQLException e) { /*ignored*/ }
          }
        }
      }
    } catch (Throwable e) {
      LOGGER.error("Cannot run " + sql, e);
      if (e instanceof SQLException) {
        return new InterpreterResult(Code.ERROR,  e.getMessage());
      } else {
        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
      }
    } finally {
      //In case user ran an insert/update/upsert statement
      if (connection != null) {
        try {
          if (!connection.getAutoCommit()) {
            connection.commit();
          }
          connection.close();
        } catch (SQLException e) { /*ignored*/ }
      }
      getJDBCConfiguration(user).removeStatement(paragraphId);
    }

    return new InterpreterResult(Code.SUCCESS);
  }