in jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java [766:918]
private InterpreterResult executeSql(String sql,
InterpreterContext context) throws InterpreterException {
Connection connection = null;
Statement statement;
ResultSet resultSet = null;
String paragraphId = context.getParagraphId();
String noteId = context.getNoteId();
String user = getUser(context);
try {
connection = getConnection(context);
} catch (IllegalArgumentException e) {
LOGGER.error("Cannot run " + sql, e);
return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration");
} catch (Exception e) {
LOGGER.error("Fail to getConnection", e);
try {
closeDBPool(user);
} catch (SQLException e1) {
LOGGER.error("Cannot close DBPool for user: " + user , e1);
}
if (e instanceof SQLException) {
return new InterpreterResult(Code.ERROR, e.getMessage());
} else {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}
if (connection == null) {
return new InterpreterResult(Code.ERROR, "User's connection not found.");
}
try {
List<String> sqlArray = sqlSplitter.splitSql(sql);
for (String sqlToExecute : sqlArray) {
String sqlTrimmedLowerCase = sqlToExecute.trim().toLowerCase();
if (sqlTrimmedLowerCase.startsWith("set ") ||
sqlTrimmedLowerCase.startsWith("list ") ||
sqlTrimmedLowerCase.startsWith("add ") ||
sqlTrimmedLowerCase.startsWith("delete ")) {
// some version of hive doesn't work with set statement with empty line ahead.
// so we need to trim it first in this case.
sqlToExecute = sqlToExecute.trim();
}
LOGGER.info("[{}|{}|{}] Execute sql: {}", user, noteId, paragraphId, sqlToExecute);
statement = connection.createStatement();
// fetch n+1 rows in order to indicate there's more rows available (for large selects)
statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult()));
statement.setMaxRows(context.getIntLocalProperty("limit", maxRows));
if (statement == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
}
try {
getJDBCConfiguration(user).saveStatement(paragraphId, statement);
String statementPrecode =
getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, DEFAULT_KEY));
if (StringUtils.isNotBlank(statementPrecode)) {
statement.execute(statementPrecode);
}
// start hive monitor thread if it is hive jdbc
String jdbcURL = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY);
String driver =
getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY);
if (jdbcURL != null && driver != null) {
if (driver.equals("org.apache.hive.jdbc.HiveDriver") &&
jdbcURL.startsWith("jdbc:hive2://")) {
HiveUtils.startHiveMonitorThread(statement, context,
Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
} else if (driver.equals("org.apache.kyuubi.jdbc.KyuubiHiveDriver") &&
(jdbcURL.startsWith("jdbc:kyuubi://") || jdbcURL.startsWith("jdbc:hive2://"))) {
KyuubiUtils.startMonitorThread(connection, statement, context,
Boolean.parseBoolean(getProperty("kyuubi.log.display", "true")), this);
}
}
boolean isResultSetAvailable = statement.execute(sqlToExecute);
getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful();
if (isResultSetAvailable) {
resultSet = statement.getResultSet();
// Regards that the command is DDL.
if (isDDLCommand(statement.getUpdateCount(),
resultSet.getMetaData().getColumnCount())) {
context.out.write("%text Query executed successfully.\n");
} else {
String template = context.getLocalProperties().get("template");
if (!StringUtils.isBlank(template)) {
resultSet.next();
SingleRowInterpreterResult singleRowResult =
new SingleRowInterpreterResult(getFirstRow(resultSet), template, context);
if (isFirstRefreshMap.get(context.getParagraphId())) {
context.out.write(singleRowResult.toAngular());
context.out.write("\n%text ");
context.out.flush();
isFirstRefreshMap.put(context.getParagraphId(), false);
}
singleRowResult.pushAngularObjects();
} else {
String results = getResults(resultSet,
!containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE));
context.out.write(results);
context.out.write("\n%text ");
context.out.flush();
}
}
} else {
// Response contains either an update count or there are no results.
int updateCount = statement.getUpdateCount();
context.out.write("\n%text " +
"Query executed successfully. Affected rows : " +
updateCount + "\n");
}
} finally {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) { /*ignored*/ }
}
if (statement != null) {
try {
statement.close();
} catch (SQLException e) { /*ignored*/ }
}
}
}
} catch (Throwable e) {
LOGGER.error("Cannot run " + sql, e);
if (e instanceof SQLException) {
return new InterpreterResult(Code.ERROR, e.getMessage());
} else {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
} finally {
//In case user ran an insert/update/upsert statement
if (connection != null) {
try {
if (!connection.getAutoCommit()) {
connection.commit();
}
connection.close();
} catch (SQLException e) { /*ignored*/ }
}
getJDBCConfiguration(user).removeStatement(paragraphId);
}
return new InterpreterResult(Code.SUCCESS);
}